diff --git a/neo/storage/database/btree.py b/neo/storage/database/btree.py index cdbbc3b36f6a12fb4fd4e4f7600622689173b133..615019b583d7d34edffb90632595e66c563ce7f5 100644 --- a/neo/storage/database/btree.py +++ b/neo/storage/database/btree.py @@ -36,6 +36,58 @@ TREE_POOL = [] # How many empty BTree istance to keep in ram MAX_TREE_POOL_SIZE = 100 +def batchDelete(tree, tester_callback, iter_kw=None, recycle_subtrees=False): + """ + Iter over given BTree and delete found entries. + tree BTree + Tree to delete entries from. + tester_callback function(key, value) -> boolean + Called with each key, value pair found in tree. + If return value is true, delete entry. Otherwise, skip to next key. + iter_kw dict + Keyword arguments for tree.items . + Warning: altered in this function. + recycle_subtrees boolean (False) + If true, deleted values will be put in TREE_POOL for future reuse. + They must be BTrees. + If False, values are not touched. + """ + if iter_kw is None: + iter_kw = {} + if recycle_subtrees: + deleter_callback = _btreeDeleterCallback + else: + deleter_callback = _deleterCallback + items = tree.items + while True: + to_delete = [] + append = to_delete.append + for key, value in safeIter(items, **iter_kw): + if tester_callback(key, value): + append(key) + if len(to_delete) >= KEY_BATCH_SIZE: + iter_kw['min'] = key + iter_kw['excludemin'] = True + break + if to_delete: + deleter_callback(tree, to_delete) + else: + break + +def _deleterCallback(tree, key_list): + for key in key_list: + del tree[key] + +if hasattr(_OOBTree, 'pop'): + def _btreeDeleterCallback(tree, key_list): + for key in key_list: + prune(tree.pop(key)) +else: + def _btreeDeleterCallback(tree, key_list): + for key in key_list: + prune(tree[key]) + del tree[key] + def OOBTree(): try: result = TREE_POOL.pop() @@ -272,28 +324,12 @@ class BTreeDatabaseManager(DatabaseManager): def setPartitionTable(self, ptid, cell_list): self.doSetPartitionTable(ptid, cell_list, True) - def _dropPartitions(self, num_partitions, offset_list, tree): - offset_list = frozenset(offset_list) - last = 0 - while True: - to_drop = [] - append = to_drop.append - for key in tree.keys(min=last): - if key % num_partitions in offset_list: - append(key) - if len(to_drop) >= KEY_BATCH_SIZE: - last = key + 1 - break - if to_drop: - for key in to_drop: - prune(tree[key]) - del tree[key] - else: - break - def dropPartitions(self, num_partitions, offset_list): - self._dropPartitions(num_partitions, offset_list, self._obj) - self._dropPartitions(num_partitions, offset_list, self._trans) + offset_list = frozenset(offset_list) + def same_partition(key, _): + return key % num_partitions in offset_list + batchDelete(self._obj, same_partition, recycle_subtrees=True) + batchDelete(self._trans, same_partition) def dropUnfinishedData(self): self._tobj = OOBTree() @@ -350,16 +386,8 @@ class BTreeDatabaseManager(DatabaseManager): def finishTransaction(self, tid): tid = util.u64(tid) - obj = self._obj - tobj = self._tobj + self._popTransactionFromTObj(tid, True) ttrans = self._ttrans - def callback(oid, data): - try: - tserial = obj[oid] - except KeyError: - tserial = obj[oid] = OOBTree() - tserial[tid] = data - self._popTransactionFromObj(tobj, tid, callback=callback) try: data = ttrans[tid] except KeyError: @@ -368,35 +396,34 @@ class BTreeDatabaseManager(DatabaseManager): del ttrans[tid] self._trans[tid] = data - def _popTransactionFromObj(self, tree, tid, callback=None): - if callback is None: - callback = lambda oid, data: None - last = 0 - while True: - to_remove = [] - append = to_remove.append - for oid, tserial in tree.items(min=last): + def _popTransactionFromTObj(self, tid, to_obj): + if to_obj: + recycle_subtrees = False + obj = self._obj + def callback(oid, data): try: - data = tserial[tid] + tserial = obj[oid] except KeyError: - continue + tserial = obj[oid] = OOBTree() + tserial[tid] = data + else: + recycle_subtrees = True + callback = lambda oid, data: None + def tester_callback(oid, tserial): + try: + data = tserial[tid] + except KeyError: + pass + else: del tserial[tid] - if not tserial: - append(oid) callback(oid, data) - if len(to_remove) >= KEY_BATCH_SIZE: - last = oid + 1 - break - if to_remove: - for oid in to_remove: - prune(tree[oid]) - del tree[oid] - else: - break + return not tserial + batchDelete(self._tobj, tester_callback, + recycle_subtrees=recycle_subtrees) def deleteTransaction(self, tid, oid_list=()): tid = util.u64(tid) - self._popTransactionFromObj(self._tobj, tid) + self._popTransactionFromTObj(tid, False) try: del self._ttrans[tid] except KeyError: @@ -606,51 +633,25 @@ class BTreeDatabaseManager(DatabaseManager): tid = util.u64(tid) updatePackFuture = self._updatePackFuture self._setPackTID(tid) - obj = self._obj - last_obj = 0 - while True: - obj_to_drop = [] - append_obj = obj_to_drop.append - for oid, tserial in safeIter(obj.items, min=last_obj): - try: - max_serial = tserial.maxKey(tid) - except ValueError: - continue - try: - tserial.maxKey(max_serial) - except ValueError: - if tserial[max_serial][2] == '': - max_serial += 1 - else: - continue - last = 0 - while True: - to_drop = [] - append = to_drop.append - for serial in tserial.keys(min=last, max=max_serial, - excludemax=True): - updatePackFuture(oid, serial, max_serial, - updateObjectDataForPack) - append(serial) - if len(to_drop) >= KEY_BATCH_SIZE: - last = serial + 1 - break - if to_drop: - for serial in to_drop: - del tserial[serial] - else: - break - if not tserial: - append_obj(oid) - if len(obj_to_drop) >= KEY_BATCH_SIZE: - last_obj = oid + 1 - break - if obj_to_drop: - for oid in to_drop: - prune(obj[oid]) - del obj[oid] - else: - break + def obj_callback(oid, tserial): + try: + max_serial = tserial.maxKey(tid) + except ValueError: + return False + try: + tserial.maxKey(max_serial) + except ValueError: + if tserial[max_serial][2] == '': + max_serial += 1 + else: + return False + def serial_callback(serial, _): + updatePackFuture(oid, serial, max_serial, + updateObjectDataForPack) + batchDelete(tserial, serial_callback, + iter_kw={'max': max_serial, 'excludemax': True}) + return not tserial + batchDelete(self._obj, obj_callback, recycle_subtrees=True) def checkTIDRange(self, min_tid, length, num_partitions, partition): # XXX: XOR is a lame checksum