Commit 3b079f7f authored by Jean-Paul Smets's avatar Jean-Paul Smets

*** empty log message ***


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1218 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 1de62a93
##############################################################################
#
# Copyright (c) 2003 Coramy SAS and Contributors. All Rights Reserved.
# Romain Courteaud <Romain_Courteaud@coramy.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
#
# This program as such is not intended to be used by end users. End
# users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the Zope Public License (ZPL) Version 2.0
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
##############################################################################
#http://www.zopera.org/Members/grival/mailavecpiecejointe/view
# http://www.pythonapocrypha.com/Chapter17/Chapter17.shtml
import StringIO
import MimeWriter
import base64
import multifile
import mimetools
import mimetypes
def sendMail(self, mMsg, mTo, mFrom, mSubj, attachmentList=None ):
# get the mailhost object
try:
mailhost=getattr(self, self.portal_url.superValues('Mail Host')[0].id)
except:
raise AttributeError, "Cannot find a Mail Host object"
else:
# XXX can t see the message with sylpheed ...
# no attachment means no mime message
#if attachmentList==None:
# mailhost.send(mMsg,mTo,mFrom,mSubj)
# construct the mime message
#else:
if 1==1:
# Create multi-part MIME message.
message = StringIO.StringIO()
writer = MimeWriter.MimeWriter(message)
writer.addheader('From', mFrom)
writer.addheader('To', mTo)
writer.addheader('Subject', mSubj)
writer.addheader('MimeVersion', '1.0')
# Don't forget to flush the headers for Communicator
writer.flushheaders()
# Generate a unique section boundary:
outer_boundary=mimetools.choose_boundary()
# Start the main message body. Write a brief message
# for non-MIME-capable readers:
dummy_file=writer.startmultipartbody("mixed",outer_boundary)
dummy_file.write("If you can read this, your mailreader\n")
dummy_file.write("can not handle multi-part messages!\n")
#dummy_file.write("This is a multi-part message in MIME format.\n")
submsg = writer.nextpart()
submsg.addheader("Content-Transfer-Encoding", "7bit")
FirstPartFile=submsg.startbody("text/plain", [("charset","US-ASCII")])
FirstPartFile.write(mMsg)
if attachmentList!=None:
# attachment: { 'name': , 'content': , 'mime_type': }
for attachment in attachmentList:
if attachment.has_key('name'):
attachment_name = attachment['name']
else:
attachment_name = ''
# try to guess the mime type
if not attachment.has_key('mime_type'):
type, encoding = mimetypes.guess_type( attachment_name )
if type != None:
attachment['mime_type'] = type
else:
attachment['mime_type'] = 'application/octet-stream'
# attach it
submsg = writer.nextpart()
if attachment['mime_type'] == 'text/plain':
attachment_file = StringIO.StringIO( attachment['content'] )
submsg.addheader("Content-Transfer-Encoding", "7bit")
submsg.addheader("Content-Disposition", "attachment;\nfilename="+attachment_name)
submsg.flushheaders()
f = submsg.startbody(attachment['mime_type'] , [("name", attachment_name)])
f.write(attachment_file.getvalue())
else:
# encode non-plaintext attachment in base64
attachment_file = StringIO.StringIO( attachment['content'] )
submsg.addheader("Content-Transfer-Encoding", "base64")
submsg.flushheaders()
f = submsg.startbody(attachment['mime_type'] , [("name", attachment_name)])
base64.encode(attachment_file, f)
# close the writer
writer.lastpart()
# send mail to user
mailhost.send(message.getvalue(),mTo,mFrom)
return None
def deleteAttribute(self, id=None):
if id is not None:
delattr(self, id)
##############################################################################
#
# Copyright (c) 2003 Coramy SAS and Contributors. All Rights Reserved.
# Romain Courteaud <Romain_Courteaud@coramy.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
#
# This program as such is not intended to be used by end users. End
# users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the Zope Public License (ZPL) Version 2.0
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os,string
from os import access,W_OK
# We first should look to the export folder
base_directory_path = '/mnt/edi'
export_directory_path = os.path.join(base_directory_path, 'Depart')
def exportEdiAuchanFile(self, user_name=None, ending_mail=0):
import os, string
from os import access,W_OK
file_name = 'exportEdiAuchan_' + self.getId() + '.env'
file_path = os.path.join(export_directory_path, file_name)
try:
resultTmp = self.SalesPackingList_exportEdiAuchan( batch_mode=1 )
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Erreur d excution, export Edi annul",mMsg=file_path)
else:
try:
# open the file
file = open( file_path , 'w')
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible d ouvrir le fichier Edi : contactez Romain",mMsg=file_path)
try:
# export the file
file.write( resultTmp )
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible d crire le fichier Edi : contactez Romain",mMsg=file_path)
try:
# close
file.close()
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible de fermer le fichier Edi : contactez Romain",mMsg=file_path)
if ending_mail:
mMsg = ''
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Export de l edi Auchan termin",mMsg=mMsg)
"""
test the directory
"""
def exportEdiAuchanTestDirectory(self ):
import os, string
from os import access,W_OK
# test the directory
return access(export_directory_path, W_OK)
##############################################################################
#
# Copyright (c) 2003 Coramy SAS and Contributors. All Rights Reserved.
# Romain Courteaud <Romain_Courteaud@coramy.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
#
# This program as such is not intended to be used by end users. End
# users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the Zope Public License (ZPL) Version 2.0
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os, string
from DateTime import DateTime
from ZPublisher.HTTPRequest import FileUpload
from cgi import FieldStorage
from os import access,W_OK
# We first should look to the import folder
base_directory_path = '/mnt/edi'
import_directory_path = os.path.join(base_directory_path, 'Arrivee')
#if not os.path.exists(import_directory_path ):
# os.mkdir( import_directory_path )
def importEdiFile(self, file_path=None, delivery_mode=None, incoterm=None, order_type=None, segmentation_strategique=None, travel_duration=None, user_name=None):
import os, string
from DateTime import DateTime
from ZPublisher.HTTPRequest import FileUpload
from cgi import FieldStorage
from os import access,W_OK
try:
# open the file
file = open( file_path , 'r')
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible d ouvrir le fichier Edi",mMsg=file_path)
try:
# create the correct parameter
form=FieldStorage()
form.filename = file_path
form.file = file
import_file = FileUpload(form)
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible de lire le fichier Edi",mMsg=file_path)
else:
if access(file_path, W_OK):
try:
# import the file
resultTmp = self.SalesOrder_importEdi(import_file=import_file, delivery_mode=delivery_mode, incoterm=incoterm, order_type=order_type, segmentation_strategique=segmentation_strategique, travel_duration=travel_duration, batch_mode=1,user_name=user_name)
except:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Erreur d execution, import Edi annul",mMsg=file_path)
else:
# test the result
file.close()
if resultTmp[1] == None:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Fichier non valide, import Edi annul",mMsg=file_path)
else:
#self.Coramy_sendMailToUser(user_name=user_name,mSubj="Import russi",mMsg=resultTmp[0])
#get_transaction().commit()
os.remove(file_path)
else:
file.close()
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible d effacer le fichier Edi, import annul",mMsg=file_path)
"""
test the directory and creation of all the messages
"""
def importEdiFileListTestAndStart(self, delivery_mode=None, incoterm=None, order_type=None, segmentation_strategique=None, travel_duration=None, user_name=None ):
import os, string
from DateTime import DateTime
from ZPublisher.HTTPRequest import FileUpload
from cgi import FieldStorage
from os import access,W_OK
# test the directory
if access(import_directory_path, W_OK):
#self.Coramy_sendMailToUser(user_name=user_name,mSubj="Lancement de l import en masse ",mMsg=import_directory_path)
files_list = os.listdir(import_directory_path)
for file_name in files_list:
file_path = os.path.join(import_directory_path, file_name)
self.activate(activity="SQLQueue", priority=4).SalesOrder_importEdiFile(file_path=file_path, delivery_mode=delivery_mode, incoterm=incoterm, order_type=order_type, segmentation_strategique=segmentation_strategique, travel_duration=travel_duration, user_name=user_name )
else:
self.Coramy_sendMailToUser(user_name=user_name,mSubj="Impossible d ecrire sur le repertoire d import, import annule",mMsg=import_directory_path)
"""
this allows to import many edi files at the same time
no more used
"""
"""
def importEdiFileList(self, REQUEST,file_path=None, delivery_mode=None, incoterm=None, order_type=None, segmentation_strategique=None, travel_duration=None, batch_mode=0):
user_name = self.portal_membership.getAuthenticatedMember().getUserName()
# test the directory
# can't be good, because this test is done on TinyLeon, and the message is done on SumicomA
if access(import_directory_path, W_OK):
files_list = os.listdir(import_directory_path)
for file_name in files_list:
file_path = os.path.join(import_directory_path, file_name)
self.activate(activity="SQLQueue").SalesOrder_importEdiFile(file_path=file_path, delivery_mode=delivery_mode, incoterm=incoterm, order_type=order_type, segmentation_strategique=segmentation_strategique, travel_duration=travel_duration, user_name=user_name )
redirect_url = '%s?%s' % ( self.absolute_url()+'/'+'view', 'portal_status_message=Import+des+fichiers+EDI+lanc.')
else:
redirect_url = '%s?%s%s' % ( self.absolute_url()+'/'+'view', "portal_status_message=Annulation:+impossible+d+crire+sur+le+rpertoire+d'import+",import_directory_path)
if batch_mode:
return None
else:
REQUEST[ 'RESPONSE' ].redirect( redirect_url )
"""
def getAvailableItemStat(self, **kw):
result = self.PieceTissu_zGetAvailableItemList(**kw)
if len(result) > 100:
return "Trop de pices"
remaining_quantity = 0.0
for m in result:
o = m.getObject()
if o is not None:
remaining_quantity += o.getRemainingQuantity()
class r:
pass
ri = r()
ri.getRemainingQuantity = remaining_quantity
return [ri]
import sys
def sendRawToCups(self, printer_name, raw_string, number_copies=1):
"""
Send ouput to printer as raw string
"""
if sys.platform == 'win32':
# No idea what to do at this point
pass
else:
from popen2 import popen2
import tempfile
tempdir = tempfile.tempdir
tempfile.tempdir = '/tmp'
newraw_path = tempfile.mktemp(suffix='.cups' )
f = open(newraw_path, 'w')
f.write(raw_string)
f.close()
tempfile.tempdir = tempdir
imgout, imgin = popen2('lp -h 192.1.2.5 -d %s -n %i %s'
% (printer_name, number_copies, newraw_path))
imgin.write('')
imgin.close()
imgout.read()
imgout.close()
##############################################################################
#
# Copyright (c) 2003 Nexedi SARL and Contributors. All Rights Reserved.
# Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
#
# This program as such is not intended to be used by end users. End
# users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the Zope Public License (ZPL) Version 2.0
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
##############################################################################
source_price_dict = {}
def getSupplierPrice(self) :
"""
Add as extension Amount_getSupplierPrice
self -- an amount (movement, delivery line, etc.)
"""
try :
coloris = self.getColoris()
except :
coloris = None
try :
variante = self.getVariante()
except :
variante = None
try :
resource = self.getResource()
resource_value = self.getResourceValue()
except :
resource = None
resource_value = None
if resource_value is None:
return 0
else :
# source price is defined on resource or on variation
predicate_value = []
if resource_value is not None :
base_category_list = resource_value.getVariationBaseCategoryList()
if 'coloris' in base_category_list and coloris :
predicate_value.append('coloris/'+coloris)
if 'variante' in base_category_list and variante :
predicate_value.append('variante/'+variante)
predicate_value.sort()
key = tuple([resource] + predicate_value)
if source_price_dict.has_key(key):
return source_price_dict[key] # This is an infinite cache
# Build cache
if resource_value is not None :
supplier_price = resource_value.getSourceBasePrice()
variation_list = resource_value.contentValues(filter={'portal_type':['Variante Tissu','Variante Composant']})
if supplier_price is None :
supplier_price = 0
root_supplier_price = supplier_price
priced_quantity = resource_value.getPricedQuantity()
if priced_quantity not in (None, 0, 1) :
supplier_price = supplier_price / priced_quantity
new_key = tuple([resource])
source_price_dict[new_key] = supplier_price
# Fill the cache
for variation in variation_list:
if variation.getSourceBasePrice() not in (None, 0) :
supplier_price = variation.getSourceBasePrice()
else :
supplier_price = root_supplier_price
if variation.getPortalType() == 'Variante Tissu' :
predicate_value = ['coloris/' + variation.getRelativeUrl()]
elif variation.getPortalType() == 'Variante Composant' :
predicate_value = ['variante/' + variation.getRelativeUrl()]
else :
prediacte_value = []
if priced_quantity not in (None, 0, 1) :
supplier_price = supplier_price / priced_quantity
new_key = tuple([resource] + predicate_value)
source_price_dict[new_key] = supplier_price
else :
return 0
if source_price_dict.has_key(key):
return source_price_dict[key]
return 0
def getSupplierPriceKeyList():
return str(source_price_dict.keys())
##############################################################################
#
# Copyright (c) 2003 Nexedi SARL and Contributors. All Rights Reserved.
# Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
#
# This program as such is not intended to be used by end users. End
# users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the Zope Public License (ZPL) Version 2.0
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
##############################################################################
pri_dict = {}
def getPri(self) :
"""
Add as extension Amount_getPri
self -- an amount (movement, delivery line, etc.)
"""
coloris = self.getColoris()
taille = self.getTaille()
morphologie = self.getMorphologie()
resource = self.getResource()
resource_value = self.getResourceValue()
if resource_value is None:
return 0
else :
# pri is defined on each resource
predicate_value = []
if resource_value is not None :
base_category_list = resource_value.getVariationBaseCategoryList()
if 'coloris' in base_category_list and coloris :
predicate_value.append('coloris/'+coloris)
if 'taille' in base_category_list and taille :
predicate_value.append('taille/'+taille)
predicate_value.sort()
key = tuple([resource] + predicate_value)
if pri_dict.has_key(key):
return pri_dict[key] # This is an infinite cache
# Build cache
if resource_value is not None :
mapped_value_list = resource_value.contentValues(filter={'portal_type':'Set Mapped Value'})
# Fill the cache
for cell in mapped_value_list:
predicate_value = []
for predicate_value_item in cell.getPredicateValueList():
if predicate_value_item <> 'value' :
predicate_value.append(predicate_value_item)
predicate_value.sort()
new_key = tuple([resource] + predicate_value)
pri_dict[new_key] = cell.getProperty(key='pri')
else :
return 0
if pri_dict.has_key(key):
return pri_dict[key]
return 0
def getPriKeyList():
return str(pri_dict.keys())
def getQuickCostingStat(self, **kw):
result = self.Transformation_quickCostingListBuilder(stat_mode=1,**kw)
total = 0
for m in result:
total += m.transformed_total_price
class r:
pass
ri = r()
ri.transformed_total_price = total
return [ri]
# this script creates an inventory for each supplier
# and places all the resource provided by the supplier in this inventory
from DateTime import DateTime
from zLOG import LOG
def Inventory_testBuildInventories(self, item=0,REQUEST=None):
"""
build inventories
"""
context=self
inventory_module = context.inventaire_mp
my_supplier_item_list = context.Resource_getSupplierItemList()
my_supplier_title_list = map(lambda x:x[0], my_supplier_item_list)
LOG('testBuildInventories',0,'supplier: %s' % str(my_supplier_title_list[item]))
for supplier in my_supplier_title_list[item] :
# create inventory
new_inventory_id = str(inventory_module.generateNewId())
my_categories = ['destination/site/Stock_MP/Gravelines','destination_section/group/Coramy']
context.portal_types.constructContent(type_name = 'Inventory MP',
container = inventory_module,
id = new_inventory_id,
description = supplier,
start_date = DateTime(),
categories = my_categories)
inventory = inventory_module[new_inventory_id]
# create all inventory lines
inventory.InventoryMP_fastAddLine(product_reference_list=[], supplier_list=[supplier])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment