Add new API of id tool:

- add the id generators generic
- add the zodb continuous increasing id generator
- add the sql non continuous increasing id generator
- change id tool for the id generators and the compatiblity with the old api
- change the business template to not modify the generator dictionaries
  during the export and the install of bt
- change the test of id tool



git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@34543 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 874be360
...@@ -55,7 +55,7 @@ from Products.ERP5Type.Utils import readLocalTest, \ ...@@ -55,7 +55,7 @@ from Products.ERP5Type.Utils import readLocalTest, \
writeLocalTest, \ writeLocalTest, \
removeLocalTest removeLocalTest
from Products.ERP5Type.Utils import convertToUpperCase from Products.ERP5Type.Utils import convertToUpperCase
from Products.ERP5Type import Permissions, PropertySheet from Products.ERP5Type import Permissions, PropertySheet, interfaces
from Products.ERP5Type.XMLObject import XMLObject from Products.ERP5Type.XMLObject import XMLObject
from OFS.Traversable import NotFound from OFS.Traversable import NotFound
from OFS import SimpleItem, XMLExportImport from OFS import SimpleItem, XMLExportImport
...@@ -591,6 +591,10 @@ class BaseTemplateItem(Implicit, Persistent): ...@@ -591,6 +591,10 @@ class BaseTemplateItem(Implicit, Persistent):
obj.deletePdfContent() obj.deletePdfContent()
elif meta_type == 'Script (Python)': elif meta_type == 'Script (Python)':
obj._code = None obj._code = None
elif interfaces.IIdGenerator.providedBy(obj):
for dict_name in ('last_max_id_dict', 'last_id_dict'):
if getattr(obj, dict_name, None) is not None:
setattr(obj, dict_name, None)
return obj return obj
def getTemplateTypeName(self): def getTemplateTypeName(self):
...@@ -1064,6 +1068,14 @@ class ObjectTemplateItem(BaseTemplateItem): ...@@ -1064,6 +1068,14 @@ class ObjectTemplateItem(BaseTemplateItem):
obj._setProperty( obj._setProperty(
'business_template_registered_skin_selections', 'business_template_registered_skin_selections',
skin_selection_list, type='tokens') skin_selection_list, type='tokens')
# in case the portal ids, we want keep the property dict
elif interfaces.IIdGenerator.providedBy(obj) and \
old_obj is not None:
for dict_name in ('last_max_id_dict', 'last_id_dict'):
# Keep previous last id dict
if getattr(old_obj, dict_name, None) is not None:
old_dict = getattr(old_obj, dict_name, None)
setattr(obj, dict_name, old_dict)
recurse(restoreHook, obj) recurse(restoreHook, obj)
# now put original order group # now put original order group
......
##############################################################################
#
# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
# Daniele Vanbaelinghem <daniele@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import zope.interface
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, Constraint, interfaces
from Products.ERP5Type.Cache import caching_instance_method
from Products.ERP5Type.Base import Base
from Products.CMFCore.utils import getToolByName
from zLOG import LOG, INFO
class IdGenerator(Base):
"""
Generator of Ids
"""
zope.interface.implements(interfaces.IIdGenerator)
# CMF Type Definition
meta_type = 'ERP5 Id Generator'
portal_type = 'Id Generator'
add_permission = Permissions.AddPortalContent
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
# Declarative property
property_sheets = ( PropertySheet.Base,
PropertySheet.Version,
PropertySheet.Reference)
security.declareProtected(Permissions.AccessContentsInformation,
'getLatestVersionValue')
def getLatestVersionValue(self, **kw):
"""
Return the last generator with the reference
"""
id_tool = getToolByName(self, 'portal_ids', None)
last_id = id_tool._getLatestIdGenerator(self.getReference())
last_version = id_tool._getOb(last_id)
return last_version
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewId')
def generateNewId(self, id_group=None, default=None,):
"""
Generate the next id in the sequence of ids of a particular group
Use int to store the last_id, use also a persistant mapping for to be
persistent.
"""
try:
specialise = self.getSpecialiseValue()
except AttributeError:
raise AttributeError, 'specialise is not defined'
if specialise is None:
raise ValueError, "the id generator %s doesn't have specialise value" %\
self.getReference()
return specialise.getLatestVersionValue().generateNewId(id_group=id_group,
default=default)
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewIdList')
def generateNewIdList(self, id_group=None, id_count=1, default=None):
"""
Generate a list of next ids in the sequence of ids of a particular group
Store the last id on a database in the portal_ids table
If stored in zodb is enable, to store the last id use Length inspired
by BTrees.Length to manage conflict in the zodb, use also a persistant
mapping to be persistent
"""
# For compatibilty with sql data, must not use id_group as a list
if not isinstance(id_group, str):
raise AttributeError, 'id_group is not a string'
try:
specialise = self.getSpecialiseValue()
except AttributeError:
raise AttributeError, 'specialise is not defined'
if specialise is None:
raise ValueError, "the id generator %s doesn't have specialise value" %\
self.getReference()
return specialise.getLatestVersionValue().generateNewIdList(id_group=id_group, \
id_count=id_count, default=default)
security.declareProtected(Permissions.AccessContentsInformation,
'initializeGenerator')
def initializeGenerator(self):
"""
Initialize generator. This is mostly used when a new ERP5 site
is created. Some generators will need to do some initialization like
creating SQL Database, prepare some data in ZODB, etc
"""
specialise = self.getSpecialiseValue()
if specialise is None:
raise ValueError, "the id generator %s doesn't have specialise value" %\
self.getReference()
specialise.getLatestVersionValue().initializeGenerator()
security.declareProtected(Permissions.AccessContentsInformation,
'clearGenerator')
def clearGenerator(self):
"""
Clear generators data. This can be usefull when working on a
development instance or in some other rare cases. This will
loose data and must be use with caution
This can be incompatible with some particular generator implementation,
in this case a particular error will be raised (to be determined and
added here)
"""
specialise = self.getSpecialiseValue()
if specialise is None:
raise ValueError, "the id generator %s doesn't have specialise value" %\
self.getReference()
specialise.getLatestVersionValue().clearGenerator()
##############################################################################
#
# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
# Daniele Vanbaelinghem <daniele@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import zope.interface
from Acquisition import aq_base
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import PersistentMapping
from Products.ERP5Type import Permissions, PropertySheet, interfaces
from Products.ERP5.Document.IdGenerator import IdGenerator
from _mysql_exceptions import ProgrammingError
from zLOG import LOG, INFO
import persistent
class LastMaxGeneratedId(persistent.Persistent):
"""
Store the last id generated
The object support application-level conflict resolution
"""
def __init__(self, value=0):
self.value = value
def __getstate__(self):
return self.value
def __setstate__(self, value):
self.value = value
def set(self, value):
self.value = value
def _p_resolveConflict(self, first_id, second_id):
return max(first_id, second_id)
class SQLNonContinuousIncreasingIdGenerator(IdGenerator):
"""
Generate some ids with mysql storage and also zodb is enabled
by the checkbox : StoredInZodb
"""
zope.interface.implements(interfaces.IIdGenerator)
# CMF Type Definition
meta_type = 'ERP5 SQL Non Continous Increasing Id Generator'
portal_type = 'SQL Non Continous Increasing Id Generator'
add_permission = Permissions.AddPortalContent
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
# Declarative property
property_sheets = (PropertySheet.SQLIdGenerator,
) + IdGenerator.property_sheets
def _generateNewId(self, id_group, id_count=1, default=None):
"""
Return the next_id with the last_id with the sql method
Store the last id on a database in the portal_ids table
If stored in zodb is enable, to store the last id use LastMaxGeneratedId inspired
by BTrees.Length to manage conflict in the zodb, use also a persistant
mapping to be persistent
"""
# Check the arguments
if id_group in (None, 'None'):
raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
if default is None:
default = 0
# Retrieve the zsql method
portal = self.getPortalObject()
generate_id_method = getattr(portal, 'IdTool_zGenerateId', None)
commit_method = getattr(portal, 'IdTool_zCommit', None)
get_last_id_method = getattr(portal, 'IdTool_zGetLastId', None)
if None in (generate_id_method, commit_method, get_last_id_method):
raise AttributeError, 'Error while generating Id: ' \
'idTool_zGenerateId and/or IdTool_zCommit and/or idTool_zGetLastId' \
'could not be found.'
result_query = generate_id_method(id_group=id_group, id_count=id_count, \
default=default)
try:
# Tries of generate the new_id
new_id = result_query[0]['LAST_INSERT_ID()']
# Commit the changement of new_id
commit_method()
except ProgrammingError:
# If the database not exist, initialise the generator
self.initializeGenerator()
if self.getStoredInZodb():
# Store the new_id on ZODB if the checkbox storedInZodb is enabled
self.last_max_id_dict = getattr(aq_base(self), \
'last_max_id_dict', None)
if self.last_max_id_dict is None:
# If the dictionary not exist, initialize the generator
self.initializeGenerator()
# Store the new value id
if self.last_max_id_dict.get(id_group, None) is None:
self.last_max_id_dict[id_group] = LastMaxGeneratedId(new_id)
self.last_max_id_dict[id_group].set(new_id)
return new_id
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewId')
def generateNewId(self, id_group=None, default=None):
"""
Generate the next id in the sequence of ids of a particular group
"""
new_id = self._generateNewId(id_group=id_group, default=default)
return new_id
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewIdList')
def generateNewIdList(self, id_group=None, id_count=1, default=None):
"""
Generate a list of next ids in the sequence of ids of a particular group
"""
new_id = self._generateNewId(id_group=id_group, id_count=id_count, \
default=default)
return range(new_id - id_count + 1, new_id + 1)
security.declareProtected(Permissions.AccessContentsInformation,
'initializeGenerator')
def initializeGenerator(self):
"""
Initialize generator. This is mostly used when a new ERP5 site
is created. Some generators will need to do some initialization like
prepare some data in ZODB
"""
LOG('initialize SQL Generator', INFO, 'Id Generator: %s' % (self,))
# Check the dictionnary
if getattr(self, 'last_max_id_dict', None) is None:
self.last_max_id_dict = PersistentMapping()
# Create table portal_ids if not exists
portal = self.getPortalObject()
get_value_list = getattr(portal, 'IdTool_zGetValueList', None)
if get_value_list is None:
raise AttributeError, 'Error while initialize generator:' \
'idTool_zGetValueList could not be found.'
try:
get_value_list()
except ProgrammingError:
drop_method = getattr(portal, 'IdTool_zDropTable', None)
create_method = getattr(portal, 'IdTool_zCreateEmptyTable', None)
if None in (drop_method, create_method):
raise AttributeError, 'Error while initialize generator: ' \
'idTool_zDropTable and/or idTool_zCreateTable could not be found.'
drop_method()
create_method()
# XXX compatiblity code below, dump the old dictionnaries
# Retrieve the zsql_method
portal_ids = getattr(self, 'portal_ids', None)
get_last_id_method = getattr(portal, 'IdTool_zGetLastId', None)
set_last_id_method = getattr(portal, 'IdTool_zSetLastId', None)
if None in (get_last_id_method, set_last_id_method):
raise AttributeError, 'Error while generating Id: ' \
'idTool_zGetLastId and/or idTool_zSetLastId could not be found.'
storage = self.getStoredInZodb()
# Recovery last_max_id_dict datas in zodb if enabled and is in mysql
if len(self.last_max_id_dict) != 0:
dump_dict = self.last_max_id_dict
elif getattr(portal_ids, 'dict_length_ids', None) is not None:
dump_dict = portal_ids.dict_length_ids
for id_group, last_id in dump_dict.items():
last_insert_id = get_last_id_method(id_group=id_group)
if len(last_insert_id) != 0:
last_insert_id = last_insert_id[0]['LAST_INSERT_ID()']
if last_insert_id > last_id.value:
# Check value in dict
if storage and (not self.last_max_id_dict.has_key(id_group) or \
self.last_max_id_dict.has_key[id_group] != last_insert_id):
self.last_max_id_dict[id_group] = LastMaxGeneratedId(last_insert_id)
self.last_max_id_dict[id_group].set(last_insert_id)
continue
last_id = int(last_id.value)
set_last_id_method(id_group=id_group, last_id=last_id)
if storage:
self.last_max_id_dict[id_group] = LastMaxGeneratedId(last_id)
self.last_max_id_dict[id_group].set(last_id)
security.declareProtected(Permissions.AccessContentsInformation,
'clearGenerator')
def clearGenerator(self):
"""
Clear generators data. This can be usefull when working on a
development instance or in some other rare cases. This will
loose data and must be use with caution
This can be incompatible with some particular generator implementation,
in this case a particular error will be raised (to be determined and
added here)
"""
# Remove dictionary
self.last_max_id_dict = PersistentMapping()
# Remove and recreate portal_ids table
portal = self.getPortalObject()
drop_method = getattr(portal, 'IdTool_zDropTable', None)
create_method = getattr(portal, 'IdTool_zCreateEmptyTable', None)
if None in (drop_method, create_method):
raise AttributeError, 'Error while clear generator: ' \
'idTool_zDropTable and/or idTool_zCreateTable could not be found.'
drop_method()
create_method()
##############################################################################
#
# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
# Daniele Vanbaelinghem <daniele@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import zope.interface
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import PersistentMapping
from Products.ERP5Type import Permissions, interfaces
from Products.ERP5.Document.IdGenerator import IdGenerator
from zLOG import LOG, INFO
class ZODBContinuousIncreasingIdGenerator(IdGenerator):
"""
Create some Ids with the zodb storage
"""
zope.interface.implements(interfaces.IIdGenerator)
# CMF Type Definition
meta_type = 'ERP5 ZODB Continous Increasing Id Generator'
portal_type = 'ZODB Continous Increasing Id Generator'
add_permission = Permissions.AddPortalContent
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
def _generateNewId(self, id_group, id_count=1, default=None):
"""
Return the new_id from the last_id of the zodb
Use int to store the last_id, use also a persistant mapping for to be
persistent.
"""
if id_group in (None, 'None'):
raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
if default is None:
default = 0
self.last_id_dict = getattr(self, 'last_id_dict', None)
if self.last_id_dict is None:
# If the dictionary not exist initialize generator
self.initializeGenerator()
marker = []
# Retrieve the last id
last_id = self.last_id_dict.get(id_group, marker)
if last_id is marker:
new_id = default
if id_count > 1:
# If create a list use the default and increment
new_id = new_id + id_count - 1
else:
# Increment the last_id
new_id = last_id + id_count
# Store the new_id in the dictionary
self.last_id_dict[id_group] = new_id
return new_id
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewId')
def generateNewId(self, id_group=None, default=None):
"""
Generate the next id in the sequence of ids of a particular group
"""
new_id = self._generateNewId(id_group=id_group, default=default)
return new_id
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewIdList')
def generateNewIdList(self, id_group=None, id_count=1, default=None):
"""
Generate a list of next ids in the sequence of ids of a particular group
"""
new_id = self._generateNewId(id_group=id_group, id_count=id_count, \
default=default)
return range(new_id - id_count + 1, new_id + 1)
security.declareProtected(Permissions.AccessContentsInformation,
'initializeGenerator')
def initializeGenerator(self):
"""
Initialize generator. This is mostly used when a new ERP5 site
is created. Some generators will need to do some initialization like
prepare some data in ZODB
"""
LOG('initialize ZODB Generator', INFO, 'Id Generator: %s' % (self,))
if getattr(self, 'last_id_dict', None) is None:
self.last_id_dict = PersistentMapping()
# XXX compatiblity code below, dump the old dictionnaries
portal_ids = getattr(self, 'portal_ids', None)
# Dump the dict_ids dictionary
if getattr(portal_ids, 'dict_ids', None) is not None:
for id_group, last_id in portal_ids.dict_ids.items():
if self.last_id_dict.has_key(id_group) and \
self.last_id_dict[id_group] > last_id:
continue
self.last_id_dict[id_group] = last_id
security.declareProtected(Permissions.AccessContentsInformation,
'clearGenerator')
def clearGenerator(self):
"""
Clear generators data. This can be usefull when working on a
development instance or in some other rare cases. This will
loose data and must be use with caution
This can be incompatible with some particular generator implementation,
in this case a particular error will be raised (to be determined and
added here)
"""
# Remove dictionary
self.last_id_dict = PersistentMapping()
...@@ -1848,15 +1848,11 @@ class ERP5Generator(PortalGenerator): ...@@ -1848,15 +1848,11 @@ class ERP5Generator(PortalGenerator):
# we don't want to make it crash # we don't want to make it crash
if p.erp5_sql_connection_type is not None: if p.erp5_sql_connection_type is not None:
setattr(p, 'isIndexable', ConstantGetter('isIndexable', value=True)) setattr(p, 'isIndexable', ConstantGetter('isIndexable', value=True))
portal_catalog = p.portal_catalog
# Clear portal ids sql table, like this we do not take # Clear portal ids sql table, like this we do not take
# ids for a previously created web site # ids for a previously created web site
# XXX It's temporary, a New API will be implemented soon p.portal_ids.initializeGenerator(all=True)
# the code will be change
p.IdTool_zDropTable()
p.IdTool_zCreateTable()
# Then clear the catalog and reindex it # Then clear the catalog and reindex it
portal_catalog.manage_catalogClear() p.portal_catalog.manage_catalogClear()
# Calling ERP5Site_reindexAll is useless. # Calling ERP5Site_reindexAll is useless.
def setupUserFolder(self, p): def setupUserFolder(self, p):
......
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
# Daniele Vanbaelinghem <daniele@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.CMFCore.Expression import Expression
class SQLIdGenerator:
"""
Id Generator provides properties to check the storage on ZODB
"""
_properties = (
{'id' :'stored_in_zodb',
'label' : 'A flag indicating if the last_id is stored in the ZODB',
'type' :'boolean',
'mode' :'w'
},
)
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
# Daniele Vanbaelinghem <daniele@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.CMFCore.Expression import Expression
class SQLIdGenerator:
"""
Id Generator provides properties to check the storage on ZODB
"""
_properties = (
{'id' :'stored_in_zodb',
'label' : 'A flag indicating if the last_id is stored in the ZODB',
'type' :'boolean',
'mode' :'w'
},
)
...@@ -26,21 +26,27 @@ ...@@ -26,21 +26,27 @@
# #
############################################################################## ##############################################################################
import warnings
import zope.interface
from Acquisition import aq_base from Acquisition import aq_base
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import InitializeClass, DTMLFile, PersistentMapping from Products.ERP5Type.Globals import InitializeClass, DTMLFile, PersistentMapping
from Products.ERP5Type.Tool.BaseTool import BaseTool from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions from Products.ERP5Type.Cache import caching_instance_method
from Products.CMFCore.utils import getToolByName from Products.ERP5Type import Permissions, interfaces
from zLOG import LOG, WARNING from zLOG import LOG, WARNING, INFO, ERROR
from Products.ERP5 import _dtmldir from Products.ERP5 import _dtmldir
from BTrees.Length import Length from BTrees.Length import Length
_marker = object()
class IdTool(BaseTool): class IdTool(BaseTool):
""" """
This tools handles the generation of IDs. This tools handles the generation of IDs.
""" """
zope.interface.implements(interfaces.IIdTool)
id = 'portal_ids' id = 'portal_ids'
meta_type = 'ERP5 Id Tool' meta_type = 'ERP5 Id Tool'
portal_type = 'Id Tool' portal_type = 'Id Tool'
...@@ -51,50 +57,109 @@ class IdTool(BaseTool): ...@@ -51,50 +57,109 @@ class IdTool(BaseTool):
security.declareProtected( Permissions.ManagePortal, 'manage_overview' ) security.declareProtected( Permissions.ManagePortal, 'manage_overview' )
manage_overview = DTMLFile( 'explainIdTool', _dtmldir ) manage_overview = DTMLFile( 'explainIdTool', _dtmldir )
security.declareProtected(Permissions.AccessContentsInformation, def newContent(self, *args, **kw):
'getLastGeneratedId')
def getLastGeneratedId(self, id_group=None, default=None):
""" """
Get the last id generated the newContent is overriden to not use generateNewId
""" """
if getattr(aq_base(self), 'dict_ids', None) is None: if not kw.has_key(id):
self.dict_ids = PersistentMapping() new_id = self._generateNextId()
last_id = None if new_id is not None:
if id_group is not None and id_group != 'None': kw['id'] = new_id
last_id = self.dict_ids.get(id_group, default) else:
return last_id raise ValueError('Failed to gererate id')
return BaseTool.newContent(self, *args, **kw)
security.declareProtected(Permissions.ModifyPortalContent, def _get_id(self, id):
'setLastGeneratedId')
def setLastGeneratedId(self, new_id, id_group=None):
""" """
Set a new last id. This is usefull in order to reset _get_id is overrided to not use generateNewId
a sequence of ids. It is used for example when an object is cloned
""" """
if getattr(aq_base(self), 'dict_ids', None) is None: if self._getOb(id, None) is None :
self.dict_ids = PersistentMapping() return id
if id_group is not None and id_group != 'None': return self._generateNextId()
self.dict_ids[id_group] = new_id
@caching_instance_method(id='IdTool._getLatestIdGenerator',
cache_factory='erp5_content_long')
def _getLatestIdGenerator(self, reference):
"""
Tries to find the id_generator with the latest version
from the current object.
Use the low-level to create a site without catalog
"""
assert reference
id_tool = self.getPortalObject().portal_ids
id_last_generator = None
version_last_generator = 0
for generator in id_tool.objectValues():
if generator.getReference() == reference:
version = generator.getVersion()
if version > version_last_generator:
id_last_generator = generator.getId()
version_last_generator = generator.getVersion()
if id_last_generator is None:
raise KeyError, 'The generator %s is not present' % (reference,)
return id_last_generator
security.declareProtected(Permissions.AccessContentsInformation, security.declareProtected(Permissions.AccessContentsInformation,
'generateNewId') 'getLatestVersionValue')
def generateNewId(self, id_group=None, default=None, method=None): def _getLatestGeneratorValue(self, id_generator):
""" """
Generate a new Id Return the last generator with the reference
""" """
id_tool = self.getPortalObject().portal_ids
last_id_generator = self._getLatestIdGenerator(id_generator)
last_generator = id_tool._getOb(last_id_generator)
return last_generator
security.declareProtected(Permissions.AccessContentsInformation,
'generateNewId')
def generateNewId(self, id_group=None, default=None, method=_marker,
id_generator=None):
"""
Generate the next id in the sequence of ids of a particular group
"""
if id_group in (None, 'None'):
raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
# for compatibilty with sql data, must not use id_group as a list
if not isinstance(id_group, str):
id_group = repr(id_group)
warnings.warn('The id_group must be a string the other types '
'is deprecated.', DeprecationWarning)
if id_generator is None:
id_generator = 'document'
if method is not _marker:
warnings.warn("The usage of 'method' argument is deprecated.\n"
"use this method with a id generator without this"
"argument", DeprecationWarning)
try:
#use _getLatestGeneratorValue here for that the technical level
#must not call the method
last_generator = self._getLatestGeneratorValue(id_generator)
new_id = last_generator.generateNewId(id_group=id_group, \
default=default)
except KeyError:
template_tool = getattr(self, 'portal_templates', None)
revision = template_tool.getInstalledBusinessTemplateRevision('erp5_core')
# XXX backward compatiblity
if revision > '1561':
LOG('generateNewId', ERROR, 'while generating id')
raise
else:
# Compatibility code below, in case the last version of erp5_core
# is not installed yet
warnings.warn("You use the old version of API which is deprecated.\n"
"please, update the business template erp5_core "
"to use the new API", DeprecationWarning)
dict_ids = getattr(aq_base(self), 'dict_ids', None) dict_ids = getattr(aq_base(self), 'dict_ids', None)
if dict_ids is None: if dict_ids is None:
dict_ids = self.dict_ids = PersistentMapping() dict_ids = self.dict_ids = PersistentMapping()
new_id = None new_id = None
if id_group is not None and id_group != 'None':
# Getting the last id # Getting the last id
if default is None: if default is None:
default = 0 default = 0
marker = [] marker = []
new_id = dict_ids.get(id_group, marker) new_id = dict_ids.get(id_group, marker)
if method is None: if method is _marker:
if new_id is marker: if new_id is marker:
new_id = default new_id = default
else: else:
...@@ -108,54 +173,136 @@ class IdTool(BaseTool): ...@@ -108,54 +173,136 @@ class IdTool(BaseTool):
return new_id return new_id
security.declareProtected(Permissions.AccessContentsInformation, security.declareProtected(Permissions.AccessContentsInformation,
'getDictLengthIdsItems') 'generateNewIdList')
def getDictLengthIdsItems(self): def generateNewIdList(self, id_group=None, id_count=1, default=None,
store=_marker, id_generator=None):
""" """
Return a copy of dict_length_ids. Generate a list of next ids in the sequence of ids of a particular group
This is a workaround to access the persistent mapping content from ZSQL
method to be able to insert initial tuples in the database at creation.
""" """
if getattr(self, 'dict_length_ids', None) is None: if id_group in (None, 'None'):
raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
# for compatibilty with sql data, must not use id_group as a list
if not isinstance(id_group, str):
id_group = repr(id_group)
warnings.warn('The id_group must be a string the other types '
'is deprecated.', DeprecationWarning)
if id_generator is None:
id_generator = 'uid'
if store is not _marker:
warnings.warn("The usage of 'store' argument is deprecated.\n"
"use this method with a id generator and without this."
"argument", DeprecationWarning)
try:
#use _getLatestGeneratorValue here for that the technical level
#must not call the method
last_generator = self._getLatestGeneratorValue(id_generator)
new_id_list = last_generator.generateNewIdList(id_group=id_group,
id_count=id_count, default=default)
except KeyError:
template_tool = getattr(self, 'portal_templates', None)
revision = template_tool.getInstalledBusinessTemplateRevision('erp5_core')
# XXX backward compatiblity
if revision > '1561':
LOG('generateNewIdList', ERROR, 'while generating id')
raise
else:
# Compatibility code below, in case the last version of erp5_core
# is not installed yet
warnings.warn("You use the old version of erp5_core to generate the ids.\n"
"please, update the business template erp5_core "
"to have the new id generators", DeprecationWarning)
new_id = None
if default is None:
default = 1
# XXX It's temporary, a New API will be implemented soon
# the code will be change
portal = self.getPortalObject()
query = getattr(portal, 'IdTool_zGenerateId', None)
commit = getattr(portal, 'IdTool_zCommit', None)
if query is None or commit is None:
portal_catalog = getattr(self, 'portal_catalog').getSQLCatalog()
query = getattr(portal_catalog, 'z_portal_ids_generate_id')
commit = getattr(portal_catalog, 'z_portal_ids_commit')
if None in (query, commit):
raise AttributeError, 'Error while generating Id: ' \
'idTool_zGenerateId and/or idTool_zCommit could not ' \
'be found.'
try:
result = query(id_group=id_group, id_count=id_count, default=default)
finally:
commit()
new_id = result[0]['LAST_INSERT_ID()']
if store:
if getattr(aq_base(self), 'dict_length_ids', None) is None:
# Length objects are stored in a persistent mapping: there is one
# Length object per id_group.
self.dict_length_ids = PersistentMapping() self.dict_length_ids = PersistentMapping()
return self.dict_length_ids.items() if self.dict_length_ids.get(id_group) is None:
self.dict_length_ids[id_group] = Length(new_id)
self.dict_length_ids[id_group].set(new_id)
new_id_list = range(new_id - id_count, new_id)
return new_id_list
security.declarePrivate('dumpDictLengthIdsItems') security.declareProtected(Permissions.AccessContentsInformation,
def dumpDictLengthIdsItems(self): 'initializeGenerator')
def initializeGenerator(self, id_generator=None, all=False):
""" """
Store persistently data from SQL table portal_ids. Initialize generators. This is mostly used when a new ERP5 site
is created. Some generators will need to do some initialization like
creating SQL Database, prepare some data in ZODB, etc
""" """
# XXX It's temporary, a New API will be implemented soon if not all:
# the code will be change #Use _getLatestGeneratorValue here for that the technical level
portal = self.getPortalObject() #must not call the method
query = getattr(portal, 'IdTool_zDump', None) last_generator = self._getLatestGeneratorValue(id_generator)
if query is None: last_generator.initializeGenerator()
portal_catalog = getToolByName(self, 'portal_catalog').getSQLCatalog()
query = getattr(portal_catalog, 'z_portal_ids_dump')
dict_length_ids = getattr(aq_base(self), 'dict_length_ids', None)
if dict_length_ids is None:
dict_length_ids = self.dict_length_ids = PersistentMapping()
for line in query().dictionaries():
id_group = line['id_group']
last_id = line['last_id']
stored_last_id = self.dict_length_ids.get(id_group)
if stored_last_id is None:
self.dict_length_ids[id_group] = Length(last_id)
else: else:
stored_last_id_value = stored_last_id() # recovery all the generators and initialize them
if stored_last_id_value < last_id: for generator in self.objectValues(\
stored_last_id.set(last_id) portal_type='Application Id Generator'):
generator.initializeGenerator()
security.declareProtected(Permissions.AccessContentsInformation,
'clearGenerator')
def clearGenerator(self, id_generator=None, all=False):
"""
Clear generators data. This can be usefull when working on a
development instance or in some other rare cases. This will
loose data and must be use with caution
This can be incompatible with some particular generator implementation,
in this case a particular error will be raised (to be determined and
added here)
"""
if not all:
#Use _getLatestGeneratorValue here for that the technical level
#must not call the method
last_generator = self._getLatestGeneratorValue(id_generator)
last_generator.clearGenerator()
else: else:
if stored_last_id_value > last_id: if len(self.objectValues()) == 0:
LOG('IdTool', WARNING, 'ZODB value (%r) for group %r is higher ' \ # compatibility with old API
'than SQL value (%r). Keeping ZODB value untouched.' % \ self.getPortalObject().IdTool_zDropTable()
(stored_last_id, id_group, last_id)) self.getPortalObject().IdTool_zCreateTable()
for generator in self.objectValues(\
portal_type='Application Id Generator'):
generator.clearGenerator()
## XXX Old API deprecated
#backward compatibility
generateNewLengthIdList = generateNewIdList
#use by alarm
security.declareProtected(Permissions.AccessContentsInformation, security.declareProtected(Permissions.AccessContentsInformation,
'getLastLengthGeneratedId') 'getLastLengthGeneratedId')
def getLastLengthGeneratedId(self, id_group, default=None): def getLastLengthGeneratedId(self, id_group, default=None):
""" """
Get the last length id generated Get the last length id generated
""" """
warnings.warn('The usage of this method is deprecated.\n'
, DeprecationWarning)
# check in persistent mapping if exists # check in persistent mapping if exists
if getattr(aq_base(self), 'dict_length_ids', None) is not None: if getattr(aq_base(self), 'dict_length_ids', None) is not None:
last_id = self.dict_length_ids.get(id_group) last_id = self.dict_length_ids.get(id_group)
...@@ -167,7 +314,7 @@ class IdTool(BaseTool): ...@@ -167,7 +314,7 @@ class IdTool(BaseTool):
portal = self.getPortalObject() portal = self.getPortalObject()
query = getattr(portal, 'IdTool_zGetLastId', None) query = getattr(portal, 'IdTool_zGetLastId', None)
if query is None: if query is None:
portal_catalog = getToolByName(self, 'portal_catalog').getSQLCatalog() portal_catalog = getattr(self, 'portal_catalog').getSQLCatalog()
query = getattr(portal_catalog, 'z_portal_ids_get_last_id') query = getattr(portal_catalog, 'z_portal_ids_get_last_id')
if query is None: if query is None:
raise AttributeError, 'Error while getting last Id: ' \ raise AttributeError, 'Error while getting last Id: ' \
...@@ -178,61 +325,36 @@ class IdTool(BaseTool): ...@@ -178,61 +325,36 @@ class IdTool(BaseTool):
return result[0]['last_id'] - 1 return result[0]['last_id'] - 1
return default return default
#use in erp5_accounting
security.declareProtected(Permissions.AccessContentsInformation, security.declareProtected(Permissions.AccessContentsInformation,
'generateNewLengthIdList') 'getLastGeneratedId')
def generateNewLengthIdList(self, id_group=None, id_count=1, default=None, def getLastGeneratedId(self, id_group=None, default=None):
store=True):
"""
Generates a list of Ids.
The ids are generated using mysql and then stored in a Length object in a
persistant mapping to be persistent.
We use MySQL to generate IDs, because it is atomic and we don't want
to generate any conflict at zope level. The possible downfall is that
some IDs might be skipped because of failed transactions.
"Length" is because the id is stored in a python object inspired by
BTrees.Length. It doesn't have to be a length.
store : if we want to store the new id into the zodb, we want it
by default
""" """
new_id = None Get the last id generated
if id_group in (None, 'None'): """
raise ValueError, '%s is not a valid group Id.' % (repr(id_group), ) warnings.warn('The usage of this method is deprecated.\n'
if not isinstance(id_group, str): , DeprecationWarning)
id_group = repr(id_group) if getattr(aq_base(self), 'dict_ids', None) is None:
if default is None: self.dict_ids = PersistentMapping()
default = 1 last_id = None
# FIXME: A skin folder should be used to contain ZSQLMethods instead of if id_group is not None and id_group != 'None':
# default catalog, like activity tool (anyway, it uses activity tool last_id = self.dict_ids.get(id_group, default)
# ZSQLConnection, so hot reindexing is not helping here). return last_id
# XXX It's temporary, a New API will be implemented soon
# the code will be change
portal = self.getPortalObject()
query = getattr(portal, 'IdTool_zGenerateId', None)
commit = getattr(portal, 'IdTool_zCommit', None)
if query is None or commit is None:
portal_catalog = getToolByName(self, 'portal_catalog').getSQLCatalog()
query = getattr(portal_catalog, 'z_portal_ids_generate_id')
commit = getattr(portal_catalog, 'z_portal_ids_commit')
if None in (query, commit):
raise AttributeError, 'Error while generating Id: ' \
'idTool_zGenerateId and/or idTool_zCommit could not ' \
'be found.'
try:
result = query(id_group=id_group, id_count=id_count, default=default)
finally:
commit()
new_id = result[0]['LAST_INSERT_ID()']
if store:
if getattr(aq_base(self), 'dict_length_ids', None) is None:
# Length objects are stored in a persistent mapping: there is one
# Length object per id_group.
self.dict_length_ids = PersistentMapping()
if self.dict_length_ids.get(id_group) is None:
self.dict_length_ids[id_group] = Length(new_id)
self.dict_length_ids[id_group].set(new_id)
return range(new_id - id_count, new_id)
#use in the unit tests
security.declareProtected(Permissions.ModifyPortalContent,
'setLastGeneratedId')
def setLastGeneratedId(self, new_id, id_group=None):
"""
Set a new last id. This is usefull in order to reset
a sequence of ids.
"""
if getattr(aq_base(self), 'dict_ids', None) is None:
self.dict_ids = PersistentMapping()
if id_group is not None and id_group != 'None':
self.dict_ids[id_group] = new_id
# use several files
security.declareProtected(Permissions.AccessContentsInformation, security.declareProtected(Permissions.AccessContentsInformation,
'generateNewLengthId') 'generateNewLengthId')
def generateNewLengthId(self, id_group=None, default=None, store=1): def generateNewLengthId(self, id_group=None, default=None, store=1):
...@@ -240,7 +362,49 @@ class IdTool(BaseTool): ...@@ -240,7 +362,49 @@ class IdTool(BaseTool):
Generates an Id. Generates an Id.
See generateNewLengthIdList documentation for details. See generateNewLengthIdList documentation for details.
""" """
return self.generateNewLengthIdList(id_group=id_group, id_count=1, warnings.warn('The usage of this method is deprecated.\n'
default=default, store=store)[0] 'use directly generateNewIdList'
' with a id_generator sql', DeprecationWarning)
new_id = self.generateNewIdList(id_group=id_group,
id_count=1, default=default, store=store)[0]
return new_id
security.declareProtected(Permissions.AccessContentsInformation,
'getDictLengthIdsItems')
def getDictLengthIdsItems(self):
"""
Return a copy of dict_length_ids.
This is a workaround to access the persistent mapping content from ZSQL
method to be able to insert initial tuples in the database at creation.
"""
if getattr(self, 'dict_length_ids', None) is None:
self.dict_length_ids = PersistentMapping()
return self.dict_length_ids.items()
security.declarePrivate('dumpDictLengthIdsItems')
def dumpDictLengthIdsItems(self):
"""
Store persistently data from SQL table portal_ids.
"""
portal_catalog = getattr(self, 'portal_catalog').getSQLCatalog()
query = getattr(portal_catalog, 'z_portal_ids_dump')
dict_length_ids = getattr(aq_base(self), 'dict_length_ids', None)
if dict_length_ids is None:
dict_length_ids = self.dict_length_ids = PersistentMapping()
for line in query().dictionaries():
id_group = line['id_group']
last_id = line['last_id']
stored_last_id = self.dict_length_ids.get(id_group)
if stored_last_id is None:
self.dict_length_ids[id_group] = Length(last_id)
else:
stored_last_id_value = stored_last_id()
if stored_last_id_value < last_id:
stored_last_id.set(last_id)
else:
if stored_last_id_value > last_id:
LOG('IdTool', WARNING, 'ZODB value (%r) for group %r is higher ' \
'than SQL value (%r). Keeping ZODB value untouched.' % \
(stored_last_id, id_group, last_id))
InitializeClass(IdTool) InitializeClass(IdTool)
...@@ -129,6 +129,14 @@ class TemplateTool (BaseTool): ...@@ -129,6 +129,14 @@ class TemplateTool (BaseTool):
installed_bts.append(bt) installed_bts.append(bt)
return installed_bts return installed_bts
def getInstalledBusinessTemplateRevision(self, title, **kw):
"""
Return the revision of business template installed with the title
given
"""
bt = self.getInstalledBusinessTemplate(title)
return bt.getRevision()
def getBuiltBusinessTemplatesList(self): def getBuiltBusinessTemplatesList(self):
"""Deprecated. """Deprecated.
""" """
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
# #
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved. # Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Aurélien Calonne <aurel@nexedi.com> # Aurélien Calonne <aurel@nexedi.com>
# Danièle Vanbaelinghem <daniele@nexedi.com>
# #
# WARNING: This program as such is intended to be used by professional # WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential # programmers who take the whole responsability of assessing all potential
...@@ -35,99 +36,228 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase ...@@ -35,99 +36,228 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class TestIdTool(ERP5TypeTestCase): class TestIdTool(ERP5TypeTestCase):
# Different variables used for this test # Different variables used for this test
run_all_test = 1 def afterSetUp(self):
resource_type='Apparel Component' self.login()
resource_variation_type='Apparel Component Variation' self.portal = self.getPortal()
resource_module = 'apparel_component_module' self.id_tool = self.portal.portal_ids
self.createGenerators()
transaction.commit()
self.tic()
def beforeTearDown(self):
self.id_tool.clearGenerator(all=True)
self.id_tool.initializeGenerator(all=True)
def getTitle(self): def getTitle(self):
""" """
Return the title of test
""" """
return "Id Tool" return "Test Id Tool"
def getBusinessTemplateList(self): def createGenerators(self):
""" """
Return the list of business templates. Initialize some generators for the tests
"""
self.application_sql_generator = self.id_tool.newContent(\
portal_type='Application Id Generator',
reference='test_application_sql',
version='001')
self.conceptual_sql_generator = self.id_tool.newContent(\
portal_type='Conceptual Id Generator',
reference='test_non_continuous_increasing',
version='001')
self.sql_generator = self.id_tool.newContent(\
portal_type='SQL Non Continuous Increasing Id Generator',
reference='test_sql_non_continuous_increasing',
version='001')
self.application_sql_generator.setSpecialiseValue(\
self.conceptual_sql_generator)
self.conceptual_sql_generator.setSpecialiseValue(self.sql_generator)
self.application_zodb_generator = self.id_tool.newContent(\
portal_type='Application Id Generator',
reference='test_application_zodb',
version='001')
self.conceptual_zodb_generator = self.id_tool.newContent(\
portal_type='Conceptual Id Generator',
reference='test_continuous_increasing',
version='001')
self.zodb_generator = self.id_tool.newContent(\
portal_type='ZODB Continuous Increasing Id Generator',
reference='test_zodb_continuous_increasing',
version='001')
self.application_zodb_generator.setSpecialiseValue(\
self.conceptual_zodb_generator)
self.conceptual_zodb_generator.setSpecialiseValue(self.zodb_generator)
def getLastGenerator(self, id_generator):
"""
Return Last Id Generator
""" """
return () document_generator = self.id_tool.searchFolder(reference=id_generator)[0]
application_generator = document_generator.getLatestVersionValue()
conceptual_generator = application_generator.getSpecialiseValue()\
.getLatestVersionValue()
last_generator = conceptual_generator.getSpecialiseValue()\
.getLatestVersionValue()
return last_generator
def test_getLastLengthGeneratedId(self, quiet=0, run=run_all_test): def test_01a_checkVersionGenerator(self):
if not run: return """
if not quiet: Add technical generator with a higher version
self.logMessage('Check getLastLengthGeneratedId method') test if the id_tool use the last version of a generator with the same
idtool = self.portal.portal_ids reference
# test with value stored into zodb """
new_id = idtool.generateNewLengthId(id_group=4, store=1) conceptual_sql_generator = self.application_sql_generator.getSpecialiseValue().\
transaction.commit() getLatestVersionValue()
self.tic() last_sql = self.conceptual_sql_generator.getSpecialiseValue().\
last_id = idtool.getLastLengthGeneratedId(id_group=4) getLatestVersionValue()
self.assertEqual(new_id, last_id) self.assertEquals(last_sql.getVersion(), '001')
# same test without storing value into zodb # Create new id generator with a more higher version
new_id = idtool.generateNewLengthId(id_group=5, store=0) sql_generator_2 = self.id_tool.newContent(\
transaction.commit() portal_type='SQL Non Continuous Increasing Id Generator',
self.tic() reference='test_sql_non_continuous_increasing',
last_id = idtool.getLastLengthGeneratedId(id_group=5) version='002')
self.assertEqual(new_id, last_id) conceptual_sql_generator.setSpecialiseValue(sql_generator_2)
# test with id_group as tuple
new_id = idtool.generateNewLengthId(id_group=(6,), store=0)
transaction.commit()
self.tic()
last_id = idtool.getLastLengthGeneratedId(id_group=(6,),)
self.assertEqual(new_id, last_id)
# test default value
last_id = idtool.getLastLengthGeneratedId(id_group=(7,),)
self.assertEqual(None, last_id)
last_id = idtool.getLastLengthGeneratedId(id_group=8,default=99)
self.assertEqual(99, last_id)
# test the store parameter with an existing value stored in the ZODB
new_id = idtool.generateNewLengthId(id_group=(9,), store=1)
transaction.commit()
self.tic()
last_id = idtool.getLastLengthGeneratedId(id_group=(9,),)
self.assertEqual(new_id, last_id)
new_id = idtool.generateNewLengthId(id_group=(9,), store=0)
transaction.commit() transaction.commit()
self.tic() self.tic()
last_id = idtool.getLastLengthGeneratedId(id_group=(9,),) # The last version is cached - reset cache
self.assertEqual(new_id, last_id) self.portal.portal_caches.clearAllCache()
last_sql = self.conceptual_sql_generator.getSpecialiseValue().\
getLatestVersionValue()
def test_generateNewId(self, quiet=0, run=run_all_test): self.assertEquals(last_sql.getVersion(), '002')
if not run: return
if not quiet: def checkGenerateNewId(self, id_generator):
self.logMessage('Check generateNewId method') """
idtool = self.portal.portal_ids Check the method generateNewId
# id tool generate ids based on a group """
self.assertEquals(0, idtool.generateNewId(id_group=('a', 'b'))) self.assertEquals(0, self.id_tool.generateNewId(id_generator=id_generator,
self.assertEquals(1, idtool.generateNewId(id_group=('a', 'b'))) id_group='a02'))
# different groups generate different ids # Different groups generate different ids
self.assertEquals(0, idtool.generateNewId(id_group=('a', 'b', 'c'))) self.assertEquals(0, self.id_tool.generateNewId(id_generator=id_generator,
id_group='b02'))
self.assertEquals(2, idtool.generateNewId(id_group=('a', 'b'))) self.assertEquals(1, self.id_tool.generateNewId(id_generator=id_generator,
self.assertEquals(1, idtool.generateNewId(id_group=('a', 'b', 'c'))) id_group='a02'))
# With default value
# you can pass an initial value self.assertEquals(0, self.id_tool.generateNewId(id_generator=id_generator,
self.assertEquals(4, idtool.generateNewId(id_group=('a', 'b', 'c', 'd'), id_group='c02', default=0))
default=4)) self.assertEquals(20, self.id_tool.generateNewId(id_generator=id_generator,
self.assertEquals(5, idtool.generateNewId(id_group=('a', 'b', 'c', 'd'), id_group='d02', default=20))
default=4)) self.assertEquals(21, self.id_tool.generateNewId(id_generator=id_generator,
#method to generate a special number id_group='d02', default=3))
def generateTestNumber(last_id):
return ('A%s'%(last_id)) def test_02a_generateNewIdWithZODBGenerator(self):
# you can pass a method """
self.assertEquals('A0', idtool.generateNewId(id_group=('c', 'd'), Check the generateNewId with a zodb id generator
method=generateTestNumber)) Test that the dictionary of the zodb is filled
self.assertEquals('AA0', idtool.generateNewId(id_group=('c', 'd'), """
method=generateTestNumber)) # check zodb dict is empty
zodb_generator = self.getLastGenerator('test_application_zodb')
self.assertEquals('AA', idtool.generateNewId(id_group=('c', 'd', 'e'), zodb_portal_type = 'ZODB Continuous Increasing Id Generator'
default='A', self.assertEquals(zodb_generator.getPortalType(), zodb_portal_type)
method=generateTestNumber)) self.assertEqual(getattr(zodb_generator, 'last_id_dict', None), None)
self.assertEquals('AAA', idtool.generateNewId(id_group=('c', 'd', 'e'), # generate ids
default='A', self.checkGenerateNewId('test_application_zodb')
method=generateTestNumber)) # check zodb dict
self.assertEqual(zodb_generator.last_id_dict['c02'], 1)
self.assertEqual(zodb_generator.last_id_dict['d02'], 20)
def checkGenerateNewIdWithSQL(self, store):
"""
Check the generateNewId with a sql id generator
Test that the database is update and also the zodb dictionary
(if the store is True)
"""
# check zodb dict is empty
sql_generator = self.getLastGenerator('test_application_sql')
sql_portal_type = 'SQL Non Continuous Increasing Id Generator'
self.assertEquals(sql_generator.getPortalType(), sql_portal_type)
self.assertEqual(getattr(sql_generator, 'last_max_id_dict', None), None)
# retrieve method to recovery the last id in the database
last_id_method = getattr(self.portal, 'IdTool_zGetLastId', None)
self.assertNotEquals(last_id_method, None)
# store the ids in zodb
if store:
sql_generator.setStoredInZodb(True)
# generate ids
self.checkGenerateNewId('test_application_sql')
# check last_id in sql
self.assertEquals(last_id_method(id_group='c02')[0]['LAST_INSERT_ID()'], 1)
self.assertEquals(last_id_method(id_group='d02')[0]['LAST_INSERT_ID()'], 20)
# check zodb dict
if store:
self.assertEqual(sql_generator.last_max_id_dict['c02'].value, 1)
self.assertEqual(sql_generator.last_max_id_dict['d02'].value, 20)
else:
self.assertEqual(getattr(sql_generator, 'last_max_id_dict', None), None)
def test_02b_generateNewIdWithSQLGeneratorWithoutStorageZODB(self):
"""
Check the generateNewId, the update of the database and
that the zodb dictionary is empty
"""
self.checkGenerateNewIdWithSQL(store=False)
def test_02c_generateNewIdWithSQLGeneratorWithStorageZODB(self):
"""
Check the generateNewId,the update of the database and
that the the zodb dictionary is filled
"""
self.checkGenerateNewIdWithSQL(store=True)
def checkGenerateNewIdList(self, id_generator):
"""
Check the generateNewIdList
"""
self.assertEquals([0], self.id_tool.generateNewIdList(\
id_generator=id_generator, id_group='a03'))
# Different groups generate different ids
self.assertEquals([0, 1], self.id_tool.generateNewIdList(\
id_generator=id_generator,
id_group='b03', id_count=2))
self.assertEquals([1 ,2, 3], self.id_tool.generateNewIdList(\
id_generator=id_generator,
id_group='a03', id_count=3))
# With default value
self.assertEquals([0, 1, 2], self.id_tool.generateNewIdList(\
id_generator=id_generator,
id_group='c03', default=0, id_count=3))
self.assertEquals([20, 21, 22], self.id_tool.generateNewIdList(\
id_generator=id_generator,
id_group='d03', default=20, id_count=3))
self.assertEquals([23, 24], self.id_tool.generateNewIdList(\
id_generator=id_generator,
id_group='d03', default=3, id_count=2))
def test_03a_generateNewIdListWithZODBGenerator(self):
"""
Check the generateNewIdList with zodb generator
"""
self.checkGenerateNewIdList('test_application_zodb')
def test_03b_generateNewIdListWithSQLGenerator(self):
"""
Check the generateNewIdList with sql generator
"""
self.checkGenerateNewIdList('test_application_sql')
def test_04_generateNewIdAndgenerateNewIdListWithTwoGenerator(self):
"""
Check that the same id_group between the generators is not modified
Check the generateNewIdList and generateNewId in the same test
"""
self.assertEquals([1, 2, 3], self.id_tool.generateNewIdList(
id_generator='test_application_zodb',
id_group='a04', default=1, id_count=3))
self.assertEquals(4, self.id_tool.generateNewId(
id_generator='test_application_zodb',
id_group='a04'))
self.assertEquals(1, self.id_tool.generateNewId(
id_generator='test_application_sql',
id_group='a04', default=1))
self.assertEquals([2, 3, 4], self.id_tool.generateNewIdList(
id_generator='test_application_sql',
id_group='a04', id_count=3))
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment