diff --git a/TODO b/TODO index d26da44a2c5544a2c4d8cc31f77b8d803c6fd233..bed039cf8bb19815dc9113224b3093893b16ed8d 100644 --- a/TODO +++ b/TODO @@ -167,6 +167,7 @@ RC - Review output of pylint (CODE) must be propagated to client. Client + - Merge Application into Storage (SPEED) - Implement C version of mq.py (LOAD LATENCY) - Use generic bootstrap module (CODE) - Find a way to make ask() from the thread poll to allow send initial packet diff --git a/neo/client/Storage.py b/neo/client/Storage.py index 06e06ce29fb88585fb86c5bb7966a49eff923177..79cccb542d8bc172fce1f1711b19a985f5feaa5e 100644 --- a/neo/client/Storage.py +++ b/neo/client/Storage.py @@ -20,7 +20,6 @@ import ZODB.interfaces from functools import wraps from neo.lib import logging -from neo.lib.locking import Lock from neo.lib.util import add64 from neo.lib.protocol import ZERO_TID from .app import Application @@ -37,17 +36,6 @@ class Storage(BaseStorage.BaseStorage, ConflictResolution.ConflictResolvingStorage): """Wrapper class for neoclient.""" - # Stores the highest TID visible for current transaction. - # First call sets this snapshot by asking master node most recent - # committed TID. - # As a (positive) side-effect, this forces us to handle all pending - # invalidations, so we get a very recent view of the database (which is - # good when multiple databases are used in the same program with some - # amount of referential integrity). - # Should remain None when not bound to a connection, - # so that it always read the last revision. - _snapshot_tid = None - implements(*filter(None, ( ZODB.interfaces.IStorage, # "restore" missing for the moment, but "store" implements this @@ -88,17 +76,6 @@ class Storage(BaseStorage.BaseStorage, 'dynamic_master_list': dynamic_master_list, '_app': _app, } - snapshot_lock = Lock() - acquire = snapshot_lock.acquire - release = snapshot_lock.release - def _setSnapshotTid(tid): - acquire() - try: - if self._snapshot_tid <= tid: - self._snapshot_tid = add64(tid, 1) - finally: - release() - self._setSnapshotTid = _setSnapshotTid @property def _cache(self): @@ -117,7 +94,7 @@ class Storage(BaseStorage.BaseStorage, # it optional. assert version == '', 'Versions are not supported' try: - return self.app.load(oid, None, self._snapshot_tid)[:2] + return self.app.load(oid)[:2] except NEOStorageNotFoundError: raise POSException.POSKeyError(oid) @@ -143,14 +120,8 @@ class Storage(BaseStorage.BaseStorage, return self.app.tpc_abort(transaction=transaction) def tpc_finish(self, transaction, f=None): - tid = self.app.tpc_finish(transaction=transaction, + return self.app.tpc_finish(transaction=transaction, tryToResolveConflict=self.tryToResolveConflict, f=f) - # XXX: Note that when undoing changes, the following is useless because - # a temporary Storage object is used to commit. - # See also testZODB.NEOZODBTests.checkMultipleUndoInOneTransaction - if self._snapshot_tid: - self._setSnapshotTid(tid) - return tid @check_read_only def store(self, oid, serial, data, version, transaction): @@ -189,9 +160,7 @@ class Storage(BaseStorage.BaseStorage, # undo @check_read_only def undo(self, transaction_id, txn): - return self.app.undo(self._snapshot_tid, undone_tid=transaction_id, - txn=txn, tryToResolveConflict=self.tryToResolveConflict) - + return self.app.undo(transaction_id, txn, self.tryToResolveConflict) @check_read_only def undoLog(self, first=0, last=-20, filter=None): @@ -213,7 +182,7 @@ class Storage(BaseStorage.BaseStorage, def loadEx(self, oid, version): try: - data, serial, _ = self.app.load(oid, None, self._snapshot_tid) + data, serial, _ = self.app.load(oid) except NEOStorageNotFoundError: raise POSException.POSKeyError(oid) return data, serial, '' @@ -231,9 +200,10 @@ class Storage(BaseStorage.BaseStorage, raise POSException.POSKeyError(oid) def sync(self, force=True): - # Increment by one, as we will use this as an excluded upper - # bound (loadBefore). - self._setSnapshotTid(self.lastTransaction()) + # XXX: sync() is part of IMVCCStorage and we don't want to be called + # from afterCompletion() so it may not be a good place to ping the + # master here. See also monkey-patch in __init__.py + self.app.lastTransaction() def copyTransactionsFrom(self, source, verbose=False): """ Zope compliant API """ diff --git a/neo/client/app.py b/neo/client/app.py index c429044d24163aec659fbb92fe4cf33a4be32142..1779845fbccbacb548767a0074cd52c6c477f5a4 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -45,7 +45,7 @@ from .poll import ThreadedPoll, psThreadedPoll from .iterator import Iterator from .cache import ClientCache from .pool import ConnectionPool -from neo.lib.util import u64, parseMasterList +from neo.lib.util import p64, u64, parseMasterList from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED from neo.lib.debug import register as registerLiveDebugger from .container import ThreadContainer, TransactionContainer @@ -65,7 +65,7 @@ else: compress = real_compress makeChecksum = real_makeChecksum -CHECKED_SERIAL = object() +CHECKED_SERIAL = master.CHECKED_SERIAL class Application(object): @@ -100,6 +100,7 @@ class Application(object): self._loading_oid = None self.new_oid_list = [] self.last_oid = '\0' * 8 + self.last_tid = None self.storage_event_handler = storage.StorageEventHandler(self) self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self) self.storage_handler = storage.StorageAnswersHandler(self) @@ -342,6 +343,7 @@ class Application(object): handler = self.primary_bootstrap_handler ask(conn, Packets.AskNodeInformation(), handler=handler) ask(conn, Packets.AskPartitionTable(), handler=handler) + self.lastTransaction() return self.pt.operational() def registerDB(self, db, limit): @@ -425,6 +427,13 @@ class Application(object): self._loading_oid = oid finally: release() + # When not bound to a ZODB Connection, load() may be the + # first method called and last_tid may still be None. + # This happens, for example, when opening the DB. + if not (tid or before_tid) and self.last_tid: + # Do not get something more recent than the last invalidation + # we got from master. + before_tid = p64(u64(self.last_tid) + 1) result = self._loadFromStorage(oid, tid, before_tid) acquire() try: @@ -777,34 +786,14 @@ class Application(object): # Call finish on master cache_dict = txn_context['cache_dict'] tid = self._askPrimary(Packets.AskFinishTransaction( - txn_context['ttid'], cache_dict), callback=f) - - # Update cache - self._cache_lock_acquire() - try: - cache = self._cache - for oid, data in cache_dict.iteritems(): - if data is CHECKED_SERIAL: - # this is just a remain of - # checkCurrentSerialInTransaction call, ignore (no data - # was modified). - continue - # Update ex-latest value in cache - try: - cache.invalidate(oid, tid) - except KeyError: - pass - if data is not None: - # Store in cache with no next_tid - cache.store(oid, data, tid, None) - finally: - self._cache_lock_release() + txn_context['ttid'], cache_dict), + cache_dict=cache_dict, callback=f) txn_container.delete(transaction) return tid finally: self._load_lock_release() - def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict): + def undo(self, undone_tid, txn, tryToResolveConflict): txn_context = self._txn_container.get(txn) if txn_context is None: raise StorageTransactionError(self, undone_tid) @@ -831,6 +820,7 @@ class Application(object): queue = self._getThreadQueue() ttid = txn_context['ttid'] undo_object_tid_dict = {} + snapshot_tid = p64(u64(self.last_tid) + 1) for partition, oid_list in partition_oid_dict.iteritems(): cell_list = getCellList(partition, readable=True) # We do want to shuffle before getting one with the smallest @@ -1027,7 +1017,8 @@ class Application(object): return Iterator(self, start, stop) def lastTransaction(self): - return self._askPrimary(Packets.AskLastTransaction()) + self._askPrimary(Packets.AskLastTransaction()) + return self.last_tid def abortVersion(self, src, transaction): if self._txn_container.get(transaction) is None: diff --git a/neo/client/handlers/master.py b/neo/client/handlers/master.py index ff503cb9da9229f5d638afa484fca5ae2443e722..c34b9bb6b13b67f53a3fc3773ac88c5c3cdcc5d1 100644 --- a/neo/client/handlers/master.py +++ b/neo/client/handlers/master.py @@ -21,6 +21,8 @@ from neo.lib.util import dump from . import BaseHandler, AnswerBaseHandler from ..exception import NEOStorageError +CHECKED_SERIAL = object() + class PrimaryBootstrapHandler(AnswerBaseHandler): """ Bootstrap handler used when looking for the primary master """ @@ -91,10 +93,34 @@ class PrimaryNotificationsHandler(BaseHandler): """ Handler that process the notifications from the primary master """ def packetReceived(self, conn, packet, kw={}): - if type(packet) is Packets.AnswerTransactionFinished: + if type(packet) is Packets.AnswerLastTransaction: + self.app.last_tid = packet.decode()[0] + elif type(packet) is Packets.AnswerTransactionFinished: + app = self.app + app.last_tid = tid = packet.decode()[1] callback = kw.pop('callback') - if callback is not None: - callback(packet.decode()[1]) + # Update cache + cache = app._cache + app._cache_lock_acquire() + try: + for oid, data in kw.pop('cache_dict').iteritems(): + if data is CHECKED_SERIAL: + # this is just a remain of + # checkCurrentSerialInTransaction call, ignore (no data + # was modified). + continue + # Update ex-latest value in cache + try: + cache.invalidate(oid, tid) + except KeyError: + pass + if data is not None: + # Store in cache with no next_tid + cache.store(oid, data, tid, None) + if callback is not None: + callback(tid) + finally: + app._cache_lock_release() BaseHandler.packetReceived(self, conn, packet, kw) def connectionClosed(self, conn): @@ -110,6 +136,7 @@ class PrimaryNotificationsHandler(BaseHandler): def invalidateObjects(self, conn, tid, oid_list): app = self.app + app.last_tid = tid app._cache_lock_acquire() try: invalidate = app._cache.invalidate @@ -163,5 +190,4 @@ class PrimaryAnswersHandler(AnswerBaseHandler): raise NEOStorageError('Already packing') def answerLastTransaction(self, conn, ltid): - self.app.setHandlerData(ltid) - + pass diff --git a/neo/tests/client/testClientApp.py b/neo/tests/client/testClientApp.py index a7674eb8a7a676cf220a7f0bd9bf0d4a0f5e4988..06e0f4396813b46bbfd3490c6f99900c83f8106c 100644 --- a/neo/tests/client/testClientApp.py +++ b/neo/tests/client/testClientApp.py @@ -510,13 +510,12 @@ class ClientApplicationTests(NeoUnitTestBase): # invalid transaction app = self.getApp() tid = self.makeTID() - snapshot_tid = self.getNextTID() txn = self.makeTransactionObject() def tryToResolveConflict(oid, conflict_serial, serial, data): pass app.master_conn = Mock() conn = Mock() - self.assertRaises(StorageTransactionError, app.undo, snapshot_tid, tid, + self.assertRaises(StorageTransactionError, app.undo, tid, txn, tryToResolveConflict) # no packet sent self.checkNoPacketSent(conn) @@ -552,6 +551,7 @@ class ClientApplicationTests(NeoUnitTestBase): unlock=False): store_marker.append((oid, serial, data, data_serial)) app._store = _store + app.last_tid = self.getNextTID() return app, conn, store_marker def test_undoWithResolutionSuccess(self): @@ -567,7 +567,6 @@ class ClientApplicationTests(NeoUnitTestBase): tid1 = self.getNextTID() tid2 = self.getNextTID() tid3 = self.getNextTID() - snapshot_tid = self.getNextTID() app, conn, store_marker = self._getAppForUndoTests(oid0, tid0, tid1, tid2) undo_serial = Packets.AnswerObjectUndoSerial({ @@ -583,7 +582,7 @@ class ClientApplicationTests(NeoUnitTestBase): return 'solved' # The undo txn = self.beginTransaction(app, tid=tid3) - app.undo(snapshot_tid, tid1, txn, tryToResolveConflict) + app.undo(tid1, txn, tryToResolveConflict) # Checking what happened moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0] self.assertEqual(moid, oid0) @@ -610,7 +609,6 @@ class ClientApplicationTests(NeoUnitTestBase): tid1 = self.getNextTID() tid2 = self.getNextTID() tid3 = self.getNextTID() - snapshot_tid = self.getNextTID() undo_serial = Packets.AnswerObjectUndoSerial({ oid0: (tid2, tid0, False)}) undo_serial.setId(2) @@ -626,8 +624,7 @@ class ClientApplicationTests(NeoUnitTestBase): return None # The undo txn = self.beginTransaction(app, tid=tid3) - self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn, - tryToResolveConflict) + self.assertRaises(UndoError, app.undo, tid1, txn, tryToResolveConflict) # Checking what happened moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0] self.assertEqual(moid, oid0) @@ -644,8 +641,7 @@ class ClientApplicationTests(NeoUnitTestBase): marker.append((oid, conflict_serial, serial, data, committedData)) raise ConflictError # The undo - self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn, - tryToResolveConflict) + self.assertRaises(UndoError, app.undo, tid1, txn, tryToResolveConflict) # Checking what happened moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0] self.assertEqual(moid, oid0) @@ -667,7 +663,6 @@ class ClientApplicationTests(NeoUnitTestBase): tid1 = self.getNextTID() tid2 = self.getNextTID() tid3 = self.getNextTID() - snapshot_tid = self.getNextTID() transaction_info = Packets.AnswerTransactionInformation(tid1, '', '', '', False, (oid0, )) transaction_info.setId(1) @@ -685,7 +680,7 @@ class ClientApplicationTests(NeoUnitTestBase): 'is no conflict in this test !' # The undo txn = self.beginTransaction(app, tid=tid3) - app.undo(snapshot_tid, tid1, txn, tryToResolveConflict) + app.undo(tid1, txn, tryToResolveConflict) # Checking what happened moid, mserial, mdata, mdata_serial = store_marker[0] self.assertEqual(moid, oid0) @@ -765,10 +760,13 @@ class ClientApplicationTests(NeoUnitTestBase): # will raise IndexError at the third iteration app = self.getApp('127.0.0.1:10010 127.0.0.1:10011') # TODO: test more connection failure cases - # Seventh packet : askNodeInformation succeeded all_passed = [] - def _ask8(_): + # askLastTransaction + def _ask9(_): all_passed.append(1) + # Seventh packet : askNodeInformation succeeded + def _ask8(_): + pass # Sixth packet : askPartitionTable succeeded def _ask7(_): app.pt = Mock({'operational': True}) @@ -799,7 +797,7 @@ class ClientApplicationTests(NeoUnitTestBase): def _ask1(_): pass ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask6, _ask7, - _ask8] + _ask8, _ask9] def _ask_base(conn, _, handler=None): ask_func_list.pop(0)(conn) app.nm.getByAddress(conn.getAddress())._connection = None diff --git a/neo/tests/threaded/test.py b/neo/tests/threaded/test.py index 3fe4f9122cd0640f1e7978082d54ffdd55c52994..76645ee253c0d1731af9738617379ce081980983 100644 --- a/neo/tests/threaded/test.py +++ b/neo/tests/threaded/test.py @@ -607,7 +607,6 @@ class Test(NEOThreadedTest): client.tpc_begin(txn) client.store(x2._p_oid, tid, x, '', txn) tid = client.tpc_finish(txn, None) - client.close() client.setPoll(0) cluster.client.setPoll(1) t1.begin() # make sure invalidation is processed @@ -619,6 +618,32 @@ class Test(NEOThreadedTest): t.join() self.assertEqual(x2.value, 1) self.assertEqual(x1.value, 0) + + def _flush_invalidations(orig): + l1.release() + l2.acquire() + orig() + x1._p_deactivate() + t1.abort() + p = Patch(c1, _flush_invalidations=_flush_invalidations) + try: + t = self.newThread(t1.begin) + l1.acquire() + cluster.client.setPoll(0) + client.setPoll(1) + txn = transaction.Transaction() + client.tpc_begin(txn) + client.store(x2._p_oid, tid, y, '', txn) + tid = client.tpc_finish(txn, None) + client.close() + client.setPoll(0) + cluster.client.setPoll(1) + finally: + del p + l2.release() + t.join() + self.assertEqual(x1.value, 1) + finally: cluster.stop()