Commit e76af297 authored by Julien Muchembled's avatar Julien Muchembled

client: in iterator records, export data serial as stored by NEO

There is simply no way to guess data serials and instead of producing random
values, the only solution is to retrieve the values from storages.

There are still differences for data serials between FileStorage and NEO:
- NEO always resolves to original serial, to avoid any indirection
  (which slightly speeds up undo at the expense of a more complex pack code)
- NEO does not make any difference between object deletion and creation undone
  (data serial always null in storage)
It has to be decided whether NEO implementation should be changed about this.

Apart from that, conversion database back from NEO should be fixed.
testExportFileStorageBug passes and there was in fact no FileStorage bug.

Another change is that iterator does not trash the client cache anymore.
parent 441145e5
...@@ -399,11 +399,11 @@ class Application(object): ...@@ -399,11 +399,11 @@ class Application(object):
# Do not get something more recent than the last invalidation # Do not get something more recent than the last invalidation
# we got from master. # we got from master.
before_tid = p64(u64(self.last_tid) + 1) before_tid = p64(u64(self.last_tid) + 1)
result = self._loadFromStorage(oid, tid, before_tid) data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
acquire() acquire()
try: try:
if not (self._loading_oid or result[2]): result = data, tid, (next_tid if self._loading_oid or next_tid
result = result[0], result[1], self._loading_invalidated else self._loading_invalidated)
self._cache.store(oid, *result) self._cache.store(oid, *result)
return result return result
finally: finally:
...@@ -415,7 +415,7 @@ class Application(object): ...@@ -415,7 +415,7 @@ class Application(object):
packet = Packets.AskObject(oid, at_tid, before_tid) packet = Packets.AskObject(oid, at_tid, before_tid)
for node, conn in self.cp.iterateForObject(oid, readable=True): for node, conn in self.cp.iterateForObject(oid, readable=True):
try: try:
noid, tid, next_tid, compression, checksum, data \ tid, next_tid, compression, checksum, data, data_tid \
= self._askStorage(conn, packet) = self._askStorage(conn, packet)
except ConnectionClosed: except ConnectionClosed:
continue continue
...@@ -425,9 +425,8 @@ class Application(object): ...@@ -425,9 +425,8 @@ class Application(object):
logging.error('wrong checksum from %s for oid %s', logging.error('wrong checksum from %s for oid %s',
conn, dump(oid)) conn, dump(oid))
continue continue
if compression: return (decompress(data) if compression else data,
data = decompress(data) tid, next_tid, data_tid)
return data, tid, next_tid
raise NEOStorageCreationUndoneError(dump(oid)) raise NEOStorageCreationUndoneError(dump(oid))
# We didn't got any object from all storage node because of # We didn't got any object from all storage node because of
# connection error # connection error
......
...@@ -58,10 +58,8 @@ class StorageBootstrapHandler(AnswerBaseHandler): ...@@ -58,10 +58,8 @@ class StorageBootstrapHandler(AnswerBaseHandler):
class StorageAnswersHandler(AnswerBaseHandler): class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """ """ Handle all messages related to ZODB operations """
def answerObject(self, conn, oid, start_serial, end_serial, def answerObject(self, conn, oid, *args):
compression, checksum, data, data_serial): self.app.setHandlerData(args)
self.app.setHandlerData((oid, start_serial, end_serial,
compression, checksum, data))
def answerStoreObject(self, conn, conflicting, oid, serial): def answerStoreObject(self, conn, conflicting, oid, serial):
txn_context = self.app.getHandlerData() txn_context = self.app.getHandlerData()
......
...@@ -34,31 +34,25 @@ class Record(BaseStorage.DataRecord): ...@@ -34,31 +34,25 @@ class Record(BaseStorage.DataRecord):
class Transaction(BaseStorage.TransactionRecord): class Transaction(BaseStorage.TransactionRecord):
""" Transaction object yielded by the NEO iterator """ """ Transaction object yielded by the NEO iterator """
def __init__(self, app, txn, prev_serial_dict): def __init__(self, app, txn):
super(Transaction, self).__init__(txn['id'], ' ', super(Transaction, self).__init__(txn['id'], ' ',
txn['user_name'], txn['description'], txn['ext']) txn['user_name'], txn['description'], txn['ext'])
self.app = app self.app = app
self.oid_list = txn['oids'] self.oid_list = txn['oids']
self.prev_serial_dict = prev_serial_dict
def __iter__(self): def __iter__(self):
""" Iterate over the transaction records """ """ Iterate over the transaction records """
load = self.app.load load = self.app._loadFromStorage
for oid in self.oid_list: for oid in self.oid_list:
try: try:
data, _, next_tid = load(oid, self.tid) data, _, _, data_tid = load(oid, self.tid, None)
except NEOStorageCreationUndoneError: except NEOStorageCreationUndoneError:
data = next_tid = None data = data_tid = None
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
# Transactions are not updated after a pack, so their object # Transactions are not updated after a pack, so their object
# will not be found in the database. Skip them. # will not be found in the database. Skip them.
continue continue
if next_tid is None: yield Record(oid, self.tid, data, data_tid)
prev_tid = self.prev_serial_dict.pop(oid, None)
else:
prev_tid = self.prev_serial_dict.get(oid)
self.prev_serial_dict[oid] = self.tid
yield Record(oid, self.tid, data, prev_tid)
def __str__(self): def __str__(self):
return 'Transaction #%s: %s %s' \ return 'Transaction #%s: %s %s' \
...@@ -70,13 +64,10 @@ def iterator(app, start=None, stop=None): ...@@ -70,13 +64,10 @@ def iterator(app, start=None, stop=None):
if start is None: if start is None:
start = ZERO_TID start = ZERO_TID
stop = min(stop or MAX_TID, app.lastTransaction()) stop = min(stop or MAX_TID, app.lastTransaction())
# OID -> previous TID mapping
# TODO: prune old entries while walking ?
prev_serial_dict = {}
while 1: while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH) max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk: if not chunk:
break # nothing more break # nothing more
for txn in chunk: for txn in chunk:
yield Transaction(app, txn, prev_serial_dict) yield Transaction(app, txn)
start = add64(max_tid, 1) start = add64(max_tid, 1)
...@@ -41,7 +41,7 @@ class StorageAnswerHandlerTests(NeoUnitTestBase): ...@@ -41,7 +41,7 @@ class StorageAnswerHandlerTests(NeoUnitTestBase):
tid2 = self.getNextTID(tid1) tid2 = self.getNextTID(tid1)
the_object = (oid, tid1, tid2, 0, '', 'DATA', None) the_object = (oid, tid1, tid2, 0, '', 'DATA', None)
self.handler.answerObject(conn, *the_object) self.handler.answerObject(conn, *the_object)
self._checkHandlerData(the_object[:-1]) self._checkHandlerData(the_object[1:])
def _getAnswerStoreObjectHandler(self, object_stored_counter_dict, def _getAnswerStoreObjectHandler(self, object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict): conflict_serial_dict, resolved_conflict_serial_dict):
......
...@@ -171,19 +171,33 @@ class ClientTests(NEOFunctionalTest): ...@@ -171,19 +171,33 @@ class ClientTests(NEOFunctionalTest):
db = ZODB.DB(storage=storage) db = ZODB.DB(storage=storage)
return (db, storage) return (db, storage)
def __populate(self, db, tree_size=TREE_SIZE, filestorage_bug=True): def __populate(self, db, tree_size=TREE_SIZE):
if isinstance(db.storage, FileStorage):
from base64 import b64encode as undo_tid
else:
undo_tid = lambda x: x
def undo(tid=None):
db.undo(undo_tid(tid or db.lastTransaction()))
transaction.commit()
conn = db.open() conn = db.open()
root = conn.root() root = conn.root()
root['trees'] = Tree(tree_size) root['trees'] = Tree(tree_size)
if filestorage_bug:
ob = root['trees'].right ob = root['trees'].right
left = ob.left left = ob.left
del ob.left del ob.left
transaction.commit() transaction.commit()
ob._p_changed = 1 ob._p_changed = 1
transaction.commit() transaction.commit()
t2 = db.lastTransaction()
ob.left = left ob.left = left
transaction.commit() transaction.commit()
undo()
t4 = db.lastTransaction()
undo(t2)
undo()
undo(t4)
undo()
undo()
conn.close() conn.close()
def testImport(self): def testImport(self):
...@@ -203,12 +217,12 @@ class ClientTests(NEOFunctionalTest): ...@@ -203,12 +217,12 @@ class ClientTests(NEOFunctionalTest):
(neo_db, neo_conn) = self.neo.getZODBConnection() (neo_db, neo_conn) = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees']) self.__checkTree(neo_conn.root()['trees'])
def testExport(self, filestorage_bug=False): def testExport(self):
# create a neo storage # create a neo storage
self.neo.start() self.neo.start()
(neo_db, neo_conn) = self.neo.getZODBConnection() (neo_db, neo_conn) = self.neo.getZODBConnection()
self.__populate(neo_db, filestorage_bug=filestorage_bug) self.__populate(neo_db)
# copy neo to data fs # copy neo to data fs
dfs_db, dfs_storage = self.__getDataFS(reset=True) dfs_db, dfs_storage = self.__getDataFS(reset=True)
...@@ -221,11 +235,6 @@ class ClientTests(NEOFunctionalTest): ...@@ -221,11 +235,6 @@ class ClientTests(NEOFunctionalTest):
self.__checkTree(root['trees']) self.__checkTree(root['trees'])
@expectedFailure(AttributeError)
def testExportFileStorageBug(self):
# currently fails due to a bug in ZODB.FileStorage
self.testExport(True)
def testLockTimeout(self): def testLockTimeout(self):
""" Hold a lock on an object to block a second transaction """ """ Hold a lock on an object to block a second transaction """
def test(): def test():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment