Commit b973cc15 authored by Vincent Pelletier's avatar Vincent Pelletier

Factorise code dropping a batch of tree entries.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2462 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ec33162f
...@@ -36,6 +36,58 @@ TREE_POOL = [] ...@@ -36,6 +36,58 @@ TREE_POOL = []
# How many empty BTree istance to keep in ram # How many empty BTree istance to keep in ram
MAX_TREE_POOL_SIZE = 100 MAX_TREE_POOL_SIZE = 100
def batchDelete(tree, tester_callback, iter_kw=None, recycle_subtrees=False):
"""
Iter over given BTree and delete found entries.
tree BTree
Tree to delete entries from.
tester_callback function(key, value) -> boolean
Called with each key, value pair found in tree.
If return value is true, delete entry. Otherwise, skip to next key.
iter_kw dict
Keyword arguments for tree.items .
Warning: altered in this function.
recycle_subtrees boolean (False)
If true, deleted values will be put in TREE_POOL for future reuse.
They must be BTrees.
If False, values are not touched.
"""
if iter_kw is None:
iter_kw = {}
if recycle_subtrees:
deleter_callback = _btreeDeleterCallback
else:
deleter_callback = _deleterCallback
items = tree.items
while True:
to_delete = []
append = to_delete.append
for key, value in safeIter(items, **iter_kw):
if tester_callback(key, value):
append(key)
if len(to_delete) >= KEY_BATCH_SIZE:
iter_kw['min'] = key
iter_kw['excludemin'] = True
break
if to_delete:
deleter_callback(tree, to_delete)
else:
break
def _deleterCallback(tree, key_list):
for key in key_list:
del tree[key]
if hasattr(_OOBTree, 'pop'):
def _btreeDeleterCallback(tree, key_list):
for key in key_list:
prune(tree.pop(key))
else:
def _btreeDeleterCallback(tree, key_list):
for key in key_list:
prune(tree[key])
del tree[key]
def OOBTree(): def OOBTree():
try: try:
result = TREE_POOL.pop() result = TREE_POOL.pop()
...@@ -272,28 +324,12 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -272,28 +324,12 @@ class BTreeDatabaseManager(DatabaseManager):
def setPartitionTable(self, ptid, cell_list): def setPartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, True) self.doSetPartitionTable(ptid, cell_list, True)
def _dropPartitions(self, num_partitions, offset_list, tree):
offset_list = frozenset(offset_list)
last = 0
while True:
to_drop = []
append = to_drop.append
for key in tree.keys(min=last):
if key % num_partitions in offset_list:
append(key)
if len(to_drop) >= KEY_BATCH_SIZE:
last = key + 1
break
if to_drop:
for key in to_drop:
prune(tree[key])
del tree[key]
else:
break
def dropPartitions(self, num_partitions, offset_list): def dropPartitions(self, num_partitions, offset_list):
self._dropPartitions(num_partitions, offset_list, self._obj) offset_list = frozenset(offset_list)
self._dropPartitions(num_partitions, offset_list, self._trans) def same_partition(key, _):
return key % num_partitions in offset_list
batchDelete(self._obj, same_partition, recycle_subtrees=True)
batchDelete(self._trans, same_partition)
def dropUnfinishedData(self): def dropUnfinishedData(self):
self._tobj = OOBTree() self._tobj = OOBTree()
...@@ -350,16 +386,8 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -350,16 +386,8 @@ class BTreeDatabaseManager(DatabaseManager):
def finishTransaction(self, tid): def finishTransaction(self, tid):
tid = util.u64(tid) tid = util.u64(tid)
obj = self._obj self._popTransactionFromTObj(tid, True)
tobj = self._tobj
ttrans = self._ttrans ttrans = self._ttrans
def callback(oid, data):
try:
tserial = obj[oid]
except KeyError:
tserial = obj[oid] = OOBTree()
tserial[tid] = data
self._popTransactionFromObj(tobj, tid, callback=callback)
try: try:
data = ttrans[tid] data = ttrans[tid]
except KeyError: except KeyError:
...@@ -368,35 +396,34 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -368,35 +396,34 @@ class BTreeDatabaseManager(DatabaseManager):
del ttrans[tid] del ttrans[tid]
self._trans[tid] = data self._trans[tid] = data
def _popTransactionFromObj(self, tree, tid, callback=None): def _popTransactionFromTObj(self, tid, to_obj):
if callback is None: if to_obj:
callback = lambda oid, data: None recycle_subtrees = False
last = 0 obj = self._obj
while True: def callback(oid, data):
to_remove = []
append = to_remove.append
for oid, tserial in tree.items(min=last):
try: try:
data = tserial[tid] tserial = obj[oid]
except KeyError: except KeyError:
continue tserial = obj[oid] = OOBTree()
tserial[tid] = data
else:
recycle_subtrees = True
callback = lambda oid, data: None
def tester_callback(oid, tserial):
try:
data = tserial[tid]
except KeyError:
pass
else:
del tserial[tid] del tserial[tid]
if not tserial:
append(oid)
callback(oid, data) callback(oid, data)
if len(to_remove) >= KEY_BATCH_SIZE: return not tserial
last = oid + 1 batchDelete(self._tobj, tester_callback,
break recycle_subtrees=recycle_subtrees)
if to_remove:
for oid in to_remove:
prune(tree[oid])
del tree[oid]
else:
break
def deleteTransaction(self, tid, oid_list=()): def deleteTransaction(self, tid, oid_list=()):
tid = util.u64(tid) tid = util.u64(tid)
self._popTransactionFromObj(self._tobj, tid) self._popTransactionFromTObj(tid, False)
try: try:
del self._ttrans[tid] del self._ttrans[tid]
except KeyError: except KeyError:
...@@ -606,51 +633,25 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -606,51 +633,25 @@ class BTreeDatabaseManager(DatabaseManager):
tid = util.u64(tid) tid = util.u64(tid)
updatePackFuture = self._updatePackFuture updatePackFuture = self._updatePackFuture
self._setPackTID(tid) self._setPackTID(tid)
obj = self._obj def obj_callback(oid, tserial):
last_obj = 0 try:
while True: max_serial = tserial.maxKey(tid)
obj_to_drop = [] except ValueError:
append_obj = obj_to_drop.append return False
for oid, tserial in safeIter(obj.items, min=last_obj): try:
try: tserial.maxKey(max_serial)
max_serial = tserial.maxKey(tid) except ValueError:
except ValueError: if tserial[max_serial][2] == '':
continue max_serial += 1
try: else:
tserial.maxKey(max_serial) return False
except ValueError: def serial_callback(serial, _):
if tserial[max_serial][2] == '': updatePackFuture(oid, serial, max_serial,
max_serial += 1 updateObjectDataForPack)
else: batchDelete(tserial, serial_callback,
continue iter_kw={'max': max_serial, 'excludemax': True})
last = 0 return not tserial
while True: batchDelete(self._obj, obj_callback, recycle_subtrees=True)
to_drop = []
append = to_drop.append
for serial in tserial.keys(min=last, max=max_serial,
excludemax=True):
updatePackFuture(oid, serial, max_serial,
updateObjectDataForPack)
append(serial)
if len(to_drop) >= KEY_BATCH_SIZE:
last = serial + 1
break
if to_drop:
for serial in to_drop:
del tserial[serial]
else:
break
if not tserial:
append_obj(oid)
if len(obj_to_drop) >= KEY_BATCH_SIZE:
last_obj = oid + 1
break
if obj_to_drop:
for oid in to_drop:
prune(obj[oid])
del obj[oid]
else:
break
def checkTIDRange(self, min_tid, length, num_partitions, partition): def checkTIDRange(self, min_tid, length, num_partitions, partition):
# XXX: XOR is a lame checksum # XXX: XOR is a lame checksum
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment