Commit 1ca655a3 authored by Julien Muchembled's avatar Julien Muchembled

Optimize WorkflowHistoryList

This is done by inheriting most of the code of ConflictFreeLog,
i.e. using a doubly-linked list:
- for fast iteration of the first elements, and in particular
  immediate access to the first element (used for creation date);
- that keeps track of the history length;
- that implement fast reverse iteration (although it could
  have been done without changing the data structure).

The size of buckets is not fixed anymore to 16 items:
like ConflictFreeLog, WorkflowHistoryList is also a good candidate
to look at the estimated serialized size of the bucket in order to
decide if elements should be added to a new one or not.
Then developers won't have to care about using Pdata or not.

The size is bigger than the ConflictFreeLog default,
because workflow items look a lot alike and adding
a few more is cheap when the ZODB compresses.

No more optimized __getstate__ (except for workflow histories that
have not been migrated) so BT export will be a bit more verbose.

The BBB code is because of
  nexedi/erp5!934

/reviewed-on nexedi/erp5!941
parent 694c9fee
...@@ -68,7 +68,7 @@ class TestType(unittest.TestCase): ...@@ -68,7 +68,7 @@ class TestType(unittest.TestCase):
expected = range(-1, 403) expected = range(-1, 403)
self.assertEqual(expected, list(x1)) self.assertEqual(expected, list(x1))
self.assertEqual(expected, list(x2)) self.assertEqual(expected, list(x2))
self.assertEqual(expected[::-1], list(x1.reversed())) self.assertEqual(expected[::-1], list(reversed(x1)))
self.assertEqual(len(expected), len(x1)) self.assertEqual(len(expected), len(x1))
self.assertEqual(len(expected), len(x2)) self.assertEqual(len(expected), len(x2))
x1 += x2 x1 += x2
......
...@@ -79,7 +79,7 @@ from Products.ERP5Type.patches.ppml import importXML ...@@ -79,7 +79,7 @@ from Products.ERP5Type.patches.ppml import importXML
customImporters={ customImporters={
XMLExportImport.magic: importXML, XMLExportImport.magic: importXML,
} }
from Products.ERP5Type.patches.WorkflowTool import WorkflowHistoryList from Products.ERP5Type.Workflow import WorkflowHistoryList
from zLOG import LOG, WARNING, INFO from zLOG import LOG, WARNING, INFO
from warnings import warn from warnings import warn
from lxml.etree import parse from lxml.etree import parse
......
import operator
from itertools import imap as map, islice
from persistent import Persistent from persistent import Persistent
class ConflictFreeLog(Persistent):
"""Scalable conflict-free append-only double-linked list
Wasted ZODB space due to conflicts is roughly proportional to the number of class DoublyLinkList(Persistent):
clients that continuously add items at the same time.
"""
_prev = _next = None _prev = _next = None
_tail_count = 0 _tail_count = 0
_bucket_size = 1000 _bucket_size = 1000
...@@ -20,32 +19,34 @@ class ConflictFreeLog(Persistent): ...@@ -20,32 +19,34 @@ class ConflictFreeLog(Persistent):
return self._tail_count + len(self._log) return self._tail_count + len(self._log)
def _maybe_rotate(self): def _maybe_rotate(self):
if self._p_estimated_size < self._bucket_size: if not self._p_changed:
self._p_changed = 1 if self._p_estimated_size < self._bucket_size:
else: self._p_changed = 1
tail = self.__class__()
tail._log = self._log
prev = self._prev
if prev is None:
prev = self
else: else:
assert not self._next._tail_count self._rotate()
tail._tail_count = self._tail_count
tail._prev = prev def _rotate(self):
prev._next = tail tail = self.__class__()
self._prev = tail tail._log = self._log
tail._next = self prev = self._prev
self._tail_count += len(self._log) if prev is None:
self._log = [] prev = self
else:
assert not self._next._tail_count
tail._tail_count = self._tail_count
tail._prev = prev
prev._next = tail
self._prev = tail
tail._next = self
self._tail_count += len(self._log)
self._log = []
def append(self, item): def append(self, item):
if not self._p_changed: self._maybe_rotate()
self._maybe_rotate()
self._log.append(item) self._log.append(item)
def extend(self, items): def extend(self, items):
if not self._p_changed: self._maybe_rotate()
self._maybe_rotate()
self._log.extend(items) self._log.extend(items)
def __iadd__(self, other): def __iadd__(self, other):
...@@ -63,15 +64,54 @@ class ConflictFreeLog(Persistent): ...@@ -63,15 +64,54 @@ class ConflictFreeLog(Persistent):
break break
bucket = bucket._next bucket = bucket._next
def reversed(self): def __reversed__(self):
bucket = self bucket = self
while 1: while 1:
for item in bucket._log[::-1]: for item in bucket._log[::-1]:
yield item yield item
bucket = bucket._prev bucket = bucket._prev
if bucket in (None, self): if bucket is None or bucket is self:
break break
def __add__(self, iterable):
new = self.__class__(self)
new.extend(iterable)
return new
def __eq__(self, other):
return (type(self) is type(other)
and len(self) == len(other)
and all(map(operator.eq, self, other)))
def __getitem__(self, index):
if index == -1: # sortcut for common case
return self._log[-1]
# TODO: optimize by caching location of previously accessed item
if isinstance(index, slice):
count = len(self)
start, stop, step = index.indices(count)
if step < 0:
count -= 1
return list(islice(reversed(self), count - start, count - stop, -step))
return list(islice(self, start, stop, step))
if index < 0:
start = index + len(self)
if start < 0:
raise IndexError(index)
else:
start = index
try:
return next(islice(self, start, None))
except StopIteration:
raise IndexError(index)
class ConflictFreeLog(DoublyLinkList):
"""Scalable conflict-free append-only doubly-linked list
Wasted ZODB space due to conflicts is roughly proportional to the number of
clients that continuously add items at the same time.
"""
def _p_resolveConflict(self, old_state, saved_state, new_state): def _p_resolveConflict(self, old_state, saved_state, new_state):
# May be called for the head and its predecessor. # May be called for the head and its predecessor.
old_tail_count = old_state.get('_tail_count', 0) old_tail_count = old_state.get('_tail_count', 0)
......
...@@ -27,11 +27,12 @@ ...@@ -27,11 +27,12 @@
# #
############################################################################## ##############################################################################
from Products.ERP5Type import Permissions
from App.special_dtml import HTMLFile from App.special_dtml import HTMLFile
from Acquisition import aq_inner from Acquisition import aq_inner
from AccessControl.requestmethod import postonly from AccessControl.requestmethod import postonly
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
from . import Permissions
from .ConflictFree import DoublyLinkList
# ERP5 workflow factory definitions # ERP5 workflow factory definitions
_workflow_factories = {} _workflow_factories = {}
...@@ -206,3 +207,31 @@ addWorkflowFactory(createERP5Workflow, ...@@ -206,3 +207,31 @@ addWorkflowFactory(createERP5Workflow,
id='erp5_workflow', id='erp5_workflow',
title='ERP5-style pre-configured DCWorkflow') title='ERP5-style pre-configured DCWorkflow')
class WorkflowHistoryList(DoublyLinkList):
_bucket_size = 4000
def __repr__(self):
return '<%s object at 0x%x %r>' % (
self.__class__.__name__, id(self), tuple(self))
def __setstate__(self, state):
# We implement a polyvalent __setstate__ on the base class because the
# class of a ghost object may differ from the actual class and ZODB does
# not fix it when it loads the object. 2 possible cases to fix:
# - after a migration (self._migrate) that is aborted (transaction.abort)
# or invalidated (self._p_invalidate)
# - the persistent reference in the parent (container)
# refers to a different class
# If ZODB fixed the class as we would except, __setstate__ would be simpler
# and only implemented on the legacy class.
if type(state) is tuple:
# legacy class that will migrate automatically
from .patches import WorkflowTool
self.__class__ = WorkflowTool.WorkflowHistoryList
# BBB: Only the first 2 because of a production instance that
# used a temporary patch to speed up workflow history lists.
self._prev, self._log = state[:2]
else:
self.__class__ = WorkflowHistoryList
super(WorkflowHistoryList, self).__setstate__(state)
...@@ -33,9 +33,9 @@ from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, AutoQuery, ComplexQuery ...@@ -33,9 +33,9 @@ from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, AutoQuery, ComplexQuery
from Products.CMFCore.utils import _getAuthenticatedUser from Products.CMFCore.utils import _getAuthenticatedUser
from Products.ERP5Type import Permissions from Products.ERP5Type import Permissions
from Products.ERP5Type.Cache import CachingMethod from Products.ERP5Type.Cache import CachingMethod
from Products.ERP5Type.Workflow import WorkflowHistoryList as NewWorkflowHistoryList
from sets import ImmutableSet from sets import ImmutableSet
from Acquisition import aq_base from Acquisition import aq_base
from Persistence import Persistent
from Products.ERP5Type.Globals import PersistentMapping from Products.ERP5Type.Globals import PersistentMapping
from MySQLdb import ProgrammingError, OperationalError from MySQLdb import ProgrammingError, OperationalError
from DateTime import DateTime from DateTime import DateTime
...@@ -721,89 +721,61 @@ def WorkflowTool_refreshWorklistCache(self): ...@@ -721,89 +721,61 @@ def WorkflowTool_refreshWorklistCache(self):
security.declareProtected(Permissions.ManagePortal, 'refreshWorklistCache') security.declareProtected(Permissions.ManagePortal, 'refreshWorklistCache')
WorkflowTool.refreshWorklistCache = WorkflowTool_refreshWorklistCache WorkflowTool.refreshWorklistCache = WorkflowTool_refreshWorklistCache
class WorkflowHistoryList(Persistent): class WorkflowHistoryList(NewWorkflowHistoryList):
_bucket_size = 16
__init__ = None
def __init__(self, iterable=None, prev=None):
self._prev = prev def __getstate__(self):
self._slots = [] return self._prev, self._log
if iterable is not None:
for x in iterable: def __nonzero__(self):
self.append(x) # not faster than __len__ but avoids migration
if self._log:
def __add__(self, iterable): return True
return self.__class__(tuple(self) + tuple(iterable)) assert self._prev is None
return False
def __contains__(self, item):
return item in tuple(self) @property
def _rotate(self):
def __eq__(self, other): self._migrate()
return tuple(self) == tuple(other) return self._rotate
def __getitem__(self, index): @property
if index == -1: def _next(self):
return self._slots[-1] self._migrate()
elif isinstance(index, (int, long)): return self._next
if index < 0:
# XXX this implementation is not so good, but rarely used. @property
index += len(self) def _tail_count(self):
iterator = self.__iter__() self._migrate()
for i in xrange(index): return self._tail_count
iterator.next()
return iterator.next() def _migrate(self):
elif isinstance(index, slice): self.__class__ = NewWorkflowHistoryList
return self.__class__((self[x] for x in bucket = self._prev
xrange(*index.indices(len(self))))) if bucket is None:
else: del self._prev
raise TypeError, 'tuple indices must be integers' return
stack = [self]
def __getslice__(self, start, end): while True:
return self.__getitem__(slice(start, end)) stack.append(bucket)
assert bucket.__class__ is WorkflowHistoryList, bucket.__class__
def __getstate__(self): bucket.__class__ = NewWorkflowHistoryList
return (self._prev, self._slots) bucket = bucket._prev
if bucket is None:
def __iter__(self): break
bucket = self self._next = bucket = stack.pop()
stack = [] count = len(bucket._log)
while bucket is not None: while True:
stack.append(bucket) bucket._next = bucket = stack.pop()
bucket = bucket._prev bucket._tail_count = count
for i in reversed(stack): if bucket is self:
for j in i._slots: break
yield j count += len(bucket._log)
def __len__(self): # BBB: A production instance used a temporary patch to speed up.
length = len(self._slots) WorkflowHistoryBucketList = WorkflowHistoryList
bucket = self._prev
while bucket is not None:
length += len(bucket._slots)
bucket = bucket._prev
return length
def __mul__(self, x):
return self.__class__(tuple(self) * x)
def __nonzero__(self):
return len(self._slots) != 0 or self._prev is not None
def __repr__(self):
#return '%s' % repr(tuple(self.__iter__()))
return '<%s object at 0x%x %r>' % (self.__class__.__name__, id(self), tuple(self))
def __rmul__(self, x):
return self.__class__(x * tuple(self))
def __setstate__(self, state):
self._prev, self._slots = state
def append(self, value):
if len(self._slots) < self._bucket_size:
self._slots.append(value)
self._p_changed = 1
else:
self._prev = self.__class__(self._slots, prev=self._prev)
self._slots = [value]
def WorkflowTool_setStatusOf(self, wf_id, ob, status): def WorkflowTool_setStatusOf(self, wf_id, ob, status):
""" Append an entry to the workflow history. """ Append an entry to the workflow history.
...@@ -817,11 +789,11 @@ def WorkflowTool_setStatusOf(self, wf_id, ob, status): ...@@ -817,11 +789,11 @@ def WorkflowTool_setStatusOf(self, wf_id, ob, status):
if history is not None: if history is not None:
has_history = 1 has_history = 1
wfh = history.get(wf_id, None) wfh = history.get(wf_id, None)
if wfh is not None and not isinstance(wfh, WorkflowHistoryList): if wfh is not None and not isinstance(wfh, NewWorkflowHistoryList):
wfh = WorkflowHistoryList(list(wfh)) wfh = NewWorkflowHistoryList(wfh)
ob.workflow_history[wf_id] = wfh ob.workflow_history[wf_id] = wfh
if wfh is None: if wfh is None:
wfh = WorkflowHistoryList() wfh = NewWorkflowHistoryList()
if not has_history: if not has_history:
ob.workflow_history = PersistentMapping() ob.workflow_history = PersistentMapping()
ob.workflow_history[wf_id] = wfh ob.workflow_history[wf_id] = wfh
......
##############################################################################
#
# Copyright (c) 2019 Nexedi SARL and Contributors. All Rights Reserved.
# Julien Muchembled <jm@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##############################################################################
from __future__ import division
from functools import wraps
from Testing.ZopeTestCase import TestCase
from Products.ERP5Type.ConflictFree import DoublyLinkList
from Products.ERP5Type.Workflow import WorkflowHistoryList
from Products.ERP5Type.patches.WorkflowTool import \
WorkflowHistoryList as LegacyWorkflowHistoryList
orig_maybe_rotate = DoublyLinkList._maybe_rotate.__func__
def _maybe_rotate(self):
if len(self._log) < 16:
self._p_changed = 1
else:
self._rotate()
def fixed_count_bucket(wrapped):
def wrapper(*args, **kw):
try:
DoublyLinkList._maybe_rotate = _maybe_rotate
return wrapped(*args, **kw)
finally:
DoublyLinkList._maybe_rotate = orig_maybe_rotate
return wraps(wrapped)(wrapper)
def new(cls, items):
dll = cls()
for item in items:
dll.append(item)
return dll
def old(items):
whl = WorkflowHistoryList()
whl.__class__ = LegacyWorkflowHistoryList
for item in items:
if len(whl._log) < 16:
whl._log.append(item)
else:
prev = whl.__new__(whl.__class__)
prev._prev = whl._prev
prev._log = whl._log
whl._prev = prev
whl._log = [item]
return whl
COUNT = 45
EXPECTED = range(COUNT)
class TestWorkflowHistoryList(TestCase):
from transaction import abort, commit
def checkList(self, ddl):
self.assertEqual(len(ddl), COUNT)
self.assertEqual(len(ddl._log), 13)
self.assertEqual(len(ddl._next._log), 16)
self.assertEqual(EXPECTED, list(ddl))
self.assertEqual(EXPECTED[::-1], list(reversed(ddl)))
self.assertEqual(EXPECTED[::-1], list(reversed(ddl)))
self.assertEqual(ddl, new(type(ddl), EXPECTED))
class check(object):
def __getitem__(_, item):
try:
a = EXPECTED[item]
except IndexError:
with self.assertRaises(IndexError):
ddl[item]
else:
assert a != [], a
self.assertEqual(a, ddl[item])
check = check()
i = COUNT + 1
for i in xrange(-i, i):
check[i]
check[-50:10]
check[:20:3]
check[5:40]
check[32::4]
check[::-1]
check[-5::-7]
check[50:40:-1]
check[30:-50:-4]
check[:30:-3]
self.assertFalse(ddl[-5:30])
self.assertFalse(ddl[30:-5:-1])
def checkClass(self, ddl, cls):
ddl._p_activate()
self.assertIs(type(ddl), cls)
bucket = ddl._prev
while bucket is not None:
bucket._p_activate()
self.assertIs(type(bucket), cls)
bucket = bucket._prev
if bucket is ddl:
break
@fixed_count_bucket
def test_01_DoublyLinkList(self):
EXPECTED = range(COUNT)
self.checkList(new(DoublyLinkList, EXPECTED))
@fixed_count_bucket
def test_02_LegacyWorkflowHistoryList(self):
whl = old(EXPECTED[:40])
# Check common operations that don't require migration.
self.assertTrue(whl)
whl.append(40)
whl += EXPECTED[41:]
self.assertEqual(EXPECTED[::-1], list(reversed(whl)))
self.assertEqual(EXPECTED[-1], whl[-1])
self.checkClass(whl, LegacyWorkflowHistoryList)
# Automatic migration on another operation.
self.assertEqual(whl[0], 0)
self.checkClass(whl, WorkflowHistoryList)
self.checkList(whl)
@fixed_count_bucket
def test_03_MigrationPersistence(self):
self.app.whl = whl = old(EXPECTED)
self.commit()
whl._p_jar.cacheMinimize()
prev, slots = whl.__getstate__()
self.assertIs(prev, whl._prev)
self.assertEqual(slots, EXPECTED[32:])
whl += xrange(3)
self.checkClass(whl, LegacyWorkflowHistoryList)
whl += xrange(2)
self.checkClass(whl, WorkflowHistoryList)
self.abort()
whl += xrange(3)
self.checkClass(whl, LegacyWorkflowHistoryList)
whl.append('foo')
whl.append('bar')
self.checkClass(whl, WorkflowHistoryList)
self.abort()
self.checkClass(whl, LegacyWorkflowHistoryList)
self.assertEqual(len(whl), COUNT)
self.checkClass(whl, WorkflowHistoryList)
self.commit()
whl._p_jar.cacheMinimize()
self.checkClass(whl, WorkflowHistoryList)
self.checkList(whl)
def test_04_rotation(self):
self.app.ddl = ddl = DoublyLinkList()
item_size = ddl._bucket_size // 3
for _ in xrange(3):
for _ in xrange(3):
ddl.append('.' * item_size)
self.commit()
self.assertEqual(6, ddl._tail_count)
self.assertEqual(3, ddl._prev._tail_count)
self.assertEqual(0, ddl._prev._prev._tail_count)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment