Commit 399f16c9 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge remote-tracking branch 'origin/master' into x/go

* origin/master:
  neoctl: make 'print ids' command display time of TIDs
  mysql: force _getNextTID() to use appropriate/whole index
  Add support for latest versions of ZODB (4.4.3 & 5.0.1)
parents 43df5645 d9dd39f0
Patch to ZODB 4.3.1 for ZODB unit tests.
See also monkey-patch to Connection.tpc_finish in neo/client/__init__.py
diff --git a/src/ZODB/interfaces.py b/src/ZODB/interfaces.py
index 44ded35..db5c525 100644
--- a/src/ZODB/interfaces.py
+++ b/src/ZODB/interfaces.py
@@ -776,6 +776,10 @@ def tpc_finish(transaction, func = lambda tid: None):
called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction.
+ The return value can be either None or a serial giving new
+ serial for objects whose ids were passed to previous store calls
+ in the same transaction, and for which no serial was returned
+ from either store or tpc_vote for objects passed to store.
"""
def tpc_vote(transaction):
@@ -794,8 +798,6 @@ def tpc_vote(transaction):
The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction.
- After the tpc_vote call, new serials must have been returned,
- either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be
the special value ZODB.ConflictResolution.ResolvedSerial to
diff --git a/src/ZODB/tests/BasicStorage.py b/src/ZODB/tests/BasicStorage.py
index e5c0526..0291654 100644
--- a/src/ZODB/tests/BasicStorage.py
+++ b/src/ZODB/tests/BasicStorage.py
@@ -69,8 +69,10 @@ def checkSerialIsNoneForInitialRevision(self):
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
- self._storage.tpc_finish(txn)
+ serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
+ if newrevid is None and serial is not None:
+ newrevid = serial
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
diff --git a/src/ZODB/tests/MTStorage.py b/src/ZODB/tests/MTStorage.py
index 4ea139c..347cae3 100644
--- a/src/ZODB/tests/MTStorage.py
+++ b/src/ZODB/tests/MTStorage.py
@@ -152,10 +152,12 @@ def dostore(self, i):
r2 = self.storage.tpc_vote(t)
self.pause()
- self.storage.tpc_finish(t)
+ serial = self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
+ if serial is not None and revid is None:
+ revid = serial
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
diff --git a/src/ZODB/tests/RevisionStorage.py b/src/ZODB/tests/RevisionStorage.py
index 2a5c370..0a0120a 100644
--- a/src/ZODB/tests/RevisionStorage.py
+++ b/src/ZODB/tests/RevisionStorage.py
@@ -150,10 +150,12 @@ def helper(tid, revid, x):
# Finish the transaction
r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
+ if serial is not None and newrevid is None:
+ newrevid = serial
return newrevid
revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2)
diff --git a/src/ZODB/tests/StorageTestBase.py b/src/ZODB/tests/StorageTestBase.py
index f0a2b72..5305807 100644
--- a/src/ZODB/tests/StorageTestBase.py
+++ b/src/ZODB/tests/StorageTestBase.py
@@ -132,7 +132,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials().
"""
- return handle_all_serials(oid, *args)[oid]
+ return handle_all_serials(oid, *args).get(oid)
def import_helper(name):
__import__(name)
@@ -189,7 +189,9 @@ def _dostore(self, oid=None, revid=None, data=None,
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
+ if serial is not None and revid is None:
+ revid = serial
except:
self._storage.tpc_abort(t)
raise
@@ -209,8 +211,8 @@ def _undo(self, tid, expected_oids=None, note=None):
self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
- if expected_oids is not None:
+ serial = self._storage.tpc_finish(t)
+ if expected_oids is not None and serial is None:
oids = list(undo_result[1]) if undo_result else []
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
diff --git a/src/ZODB/tests/TransactionalUndoStorage.py b/src/ZODB/tests/TransactionalUndoStorage.py
index 0ee68b2..4dab889 100644
--- a/src/ZODB/tests/TransactionalUndoStorage.py
+++ b/src/ZODB/tests/TransactionalUndoStorage.py
@@ -73,6 +73,12 @@ def _transaction_vote(self, trans):
def _transaction_newserial(self, oid):
return self.__serials[oid]
+ def _transaction_finish(self, t, oid_list):
+ tid = self._storage.tpc_finish(t)
+ if tid is not None:
+ for oid in oid_list:
+ self.__serials[oid] = tid
+
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
@@ -82,7 +88,7 @@ def _multi_obj_transaction(self, objs):
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
@@ -219,9 +225,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
@@ -231,9 +237,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
@@ -289,10 +295,11 @@ def checkTwoObjectUndoAtOnce(self):
tid1 = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
# We may get the finalization stuff called an extra time,
# depending on the implementation.
- self.assertEqual(set(oids), set((oid1, oid2)))
+ if serial is None:
+ self.assertEqual(set(oids), {oid1, oid2})
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
@@ -326,7 +333,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -346,7 +353,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -358,10 +365,11 @@ def checkTwoObjectUndoAgain(self):
tid = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid)
- self._storage.tpc_finish(t)
- eq(len(oids), 1)
- self.assertTrue(oid1 in oids)
- self.assertTrue(not oid2 in oids)
+ serial = self._storage.tpc_finish(t)
+ if serial is None:
+ eq(len(oids), 1)
+ self.assertTrue(oid1 in oids)
+ self.assertTrue(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
@@ -398,7 +406,7 @@ def checkNotUndoable(self):
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -685,9 +693,9 @@ def undo(i):
tid = p64(i + 1)
eq(txn.tid, tid)
- L1 = [(rec.oid, rec.tid, rec.data_txn) for rec in txn]
- L2 = [(oid, revid, None) for _tid, oid, revid in orig
- if _tid == tid]
+ L1 = {(rec.oid, rec.tid, rec.data_txn) for rec in txn}
+ L2 = {(oid, revid, None) for _tid, oid, revid in orig
+ if _tid == tid}
eq(L1, L2)
...@@ -159,11 +159,8 @@ class Storage(BaseStorage.BaseStorage, ...@@ -159,11 +159,8 @@ class Storage(BaseStorage.BaseStorage,
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
def sync(self, force=True): def sync(self):
# XXX: sync() is part of IMVCCStorage and we don't want to be called return self.app.sync()
# from afterCompletion() so it may not be a good place to ping the
# master here. See also monkey-patch in __init__.py
self.app.lastTransaction()
def copyTransactionsFrom(self, source, verbose=False): def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """ """ Zope compliant API """
...@@ -186,7 +183,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -186,7 +183,7 @@ class Storage(BaseStorage.BaseStorage,
def lastTransaction(self): def lastTransaction(self):
# Used in ZODB unit tests # Used in ZODB unit tests
return self.app.lastTransaction() return self.app.last_tid
def _clear_temp(self): def _clear_temp(self):
raise NotImplementedError raise NotImplementedError
......
...@@ -12,15 +12,11 @@ ...@@ -12,15 +12,11 @@
# #
############################################################################## ##############################################################################
import app # set up signal handers early enough to do it in the main thread def patch():
if 1:
from hashlib import md5 from hashlib import md5
from ZODB.Connection import Connection from ZODB.Connection import Connection
def _check(f, *args): H = lambda f: md5(f.func_code.co_code).hexdigest()
h = md5(f.func_code.co_code).hexdigest()
assert h in args, h
# Allow serial to be returned as late as tpc_finish # Allow serial to be returned as late as tpc_finish
# #
...@@ -28,9 +24,7 @@ if 1: ...@@ -28,9 +24,7 @@ if 1:
# removing the requirement to serialise second commit phase (tpc_vote # removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort). # to tpc_finish/tpc_abort).
_check(Connection.tpc_finish, h = H(Connection.tpc_finish)
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done.""" """Indicate confirmation that the transaction is done."""
...@@ -61,18 +55,27 @@ if 1: ...@@ -61,18 +55,27 @@ if 1:
# </patch> # </patch>
self._tpc_cleanup() self._tpc_cleanup()
global OLD_ZODB
OLD_ZODB = h in (
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
if OLD_ZODB:
Connection.tpc_finish = tpc_finish Connection.tpc_finish = tpc_finish
elif hasattr(Connection, '_handle_serial'): # merged upstream ?
assert hasattr(Connection, '_warn_about_returned_serial')
# IStorage implementations usually need to provide a "network barrier", # sync() is used to provide a "network barrier", which is required for
# at least for NEO & ZEO, to make sure we have an up-to-date view of # NEO & ZEO to make sure our view of the storage includes all changes done
# the storage. It's unclear whether sync() is a good place to do this # so far by other clients. But a round-trip to the server introduces
# because a round-trip to the server introduces latency and we prefer # latency so it must not be done when it's not useful. Note also that a
# it's not done when it's not useful. # successful commit (which ends with a response from the master) already
# For example, we know we are up-to-date after a successful commit, # acts as a "network barrier".
# so this should not be done in afterCompletion(), and anyway, we don't # BBB: What this monkey-patch does has been merged in ZODB5.
# know any legitimate use of DB access outside a transaction. if not hasattr(Connection, '_flush_invalidations'):
return
_check(Connection.afterCompletion, assert H(Connection.afterCompletion) in (
'cd3a080b80fd957190ff3bb867149448', # Python 2.7 'cd3a080b80fd957190ff3bb867149448', # Python 2.7
) )
...@@ -81,3 +84,7 @@ if 1: ...@@ -81,3 +84,7 @@ if 1:
# PATCH: do not call sync() # PATCH: do not call sync()
self._flush_invalidations() self._flush_invalidations()
Connection.afterCompletion = afterCompletion Connection.afterCompletion = afterCompletion
patch()
import app # set up signal handers early enough to do it in the main thread
...@@ -24,7 +24,9 @@ from functools import partial ...@@ -24,7 +24,9 @@ from functools import partial
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError from ZODB.POSException import ReadConflictError
from ZODB.ConflictResolution import ResolvedSerial from . import OLD_ZODB
if OLD_ZODB:
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
...@@ -108,7 +110,6 @@ class Application(ThreadedApplication): ...@@ -108,7 +110,6 @@ class Application(ThreadedApplication):
self._loading_oid = None self._loading_oid = None
self.new_oid_list = () self.new_oid_list = ()
self.last_oid = '\0' * 8 self.last_oid = '\0' * 8
self.last_tid = None
self.storage_event_handler = storage.StorageEventHandler(self) self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self) self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
self.storage_handler = storage.StorageAnswersHandler(self) self.storage_handler = storage.StorageAnswersHandler(self)
...@@ -136,7 +137,11 @@ class Application(ThreadedApplication): ...@@ -136,7 +137,11 @@ class Application(ThreadedApplication):
self.compress = compress self.compress = compress
def __getattr__(self, attr): def __getattr__(self, attr):
if attr == 'pt': if attr in ('last_tid', 'pt'):
if self._connecting_to_master_node.locked():
if attr == 'last_tid':
return
else:
self._getMasterConnection() self._getMasterConnection()
return self.__getattribute__(attr) return self.__getattribute__(attr)
...@@ -603,7 +608,7 @@ class Application(ThreadedApplication): ...@@ -603,7 +608,7 @@ class Application(ThreadedApplication):
logging.error('tpc_store failed') logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed') raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set: elif oid in resolved_oid_set:
append((oid, ResolvedSerial)) append((oid, ResolvedSerial) if OLD_ZODB else oid)
return result return result
def tpc_vote(self, transaction, tryToResolveConflict): def tpc_vote(self, transaction, tryToResolveConflict):
...@@ -950,9 +955,8 @@ class Application(ThreadedApplication): ...@@ -950,9 +955,8 @@ class Application(ThreadedApplication):
from .iterator import iterator from .iterator import iterator
def lastTransaction(self): def sync(self):
self._askPrimary(Packets.AskLastTransaction()) self._askPrimary(Packets.Ping())
return self.last_tid
def pack(self, t): def pack(self, t):
tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw() tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw()
......
...@@ -99,7 +99,10 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -99,7 +99,10 @@ class PrimaryNotificationsHandler(MTEventHandler):
app = self.app app = self.app
ltid = packet.decode()[0] ltid = packet.decode()[0]
if app.last_tid != ltid: if app.last_tid != ltid:
if app.master_conn is None: # Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
if 1:
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
if app.last_tid < ltid: if app.last_tid < ltid:
......
...@@ -63,7 +63,7 @@ def iterator(app, start=None, stop=None): ...@@ -63,7 +63,7 @@ def iterator(app, start=None, stop=None):
"""NEO transaction iterator""" """NEO transaction iterator"""
if start is None: if start is None:
start = ZERO_TID start = ZERO_TID
stop = min(stop or MAX_TID, app.lastTransaction()) stop = min(stop or MAX_TID, app.last_tid)
while 1: while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH) max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk: if not chunk:
......
...@@ -226,8 +226,9 @@ class MTEventHandler(EventHandler): ...@@ -226,8 +226,9 @@ class MTEventHandler(EventHandler):
def packetReceived(self, conn, packet, kw={}): def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread.""" """Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong: if packet.isResponse():
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw): if not (self.dispatcher.dispatch(conn, packet.getId(), packet, kw)
or type(packet) is Packets.Pong):
raise ProtocolError('Unexpected response packet from %r: %r' raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet)) % (conn, packet))
else: else:
......
...@@ -81,6 +81,16 @@ def unpackTID(ptid): ...@@ -81,6 +81,16 @@ def unpackTID(ptid):
higher.reverse() higher.reverse()
return (tuple(higher), lower) return (tuple(higher), lower)
def timeStringFromTID(ptid):
"""
Return a string in the format "yyyy-mm-dd hh:mm:ss.ssssss" from a TID
"""
higher, lower = unpackTID(ptid)
seconds = lower * SECOND_PER_TID_LOW
return '%04d-%02d-%02d %02d:%02d:%09.6f' % (higher[0], higher[1], higher[2],
higher[3], higher[4], seconds)
def addTID(ptid, offset): def addTID(ptid, offset):
""" """
Offset given packed TID. Offset given packed TID.
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from operator import itemgetter from operator import itemgetter
from .neoctl import NeoCTL, NotReadyException from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import p64, u64, tidFromTime from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, ClusterStates, NodeTypes, \ from neo.lib.protocol import uuid_str, ClusterStates, NodeTypes, \
UUID_NAMESPACES, ZERO_TID UUID_NAMESPACES, ZERO_TID
...@@ -89,11 +89,13 @@ class TerminalNeoCTL(object): ...@@ -89,11 +89,13 @@ class TerminalNeoCTL(object):
ptid, backup_tid, truncate_tid = self.neoctl.getRecovery() ptid, backup_tid, truncate_tid = self.neoctl.getRecovery()
if backup_tid: if backup_tid:
ltid = self.neoctl.getLastTransaction() ltid = self.neoctl.getLastTransaction()
r = "backup_tid = 0x%x" % u64(backup_tid) r = "backup_tid = 0x%x (%s)" % (u64(backup_tid),
timeStringFromTID(backup_tid))
else: else:
loid, ltid = self.neoctl.getLastIds() loid, ltid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % u64(loid) r = "last_oid = 0x%x" % (u64(loid))
return r + "\nlast_tid = 0x%x\nlast_ptid = %u" % (u64(ltid), ptid) return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %u" % \
(u64(ltid), timeStringFromTID(ltid), ptid)
def getPartitionRowList(self, params): def getPartitionRowList(self, params):
""" """
...@@ -311,4 +313,3 @@ class Application(object): ...@@ -311,4 +313,3 @@ class Application(object):
" e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'" " e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'"
" for 2012-01-01 12:34:56 UTC") " for 2012-01-01 12:34:56 UTC")
return '\n'.join(output_list) return '\n'.join(output_list)
...@@ -331,6 +331,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -331,6 +331,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _getNextTID(self, *args): # partition, oid, tid def _getNextTID(self, *args): # partition, oid, tid
r = self.query("SELECT tid FROM obj" r = self.query("SELECT tid FROM obj"
" FORCE INDEX(`partition`)"
" WHERE `partition`=%d AND oid=%d AND tid>%d" " WHERE `partition`=%d AND oid=%d AND tid>%d"
" ORDER BY tid LIMIT 1" % args) " ORDER BY tid LIMIT 1" % args)
return r[0][0] if r else None return r[0][0] if r else None
......
...@@ -38,6 +38,7 @@ class Dispatcher(object): ...@@ -38,6 +38,7 @@ class Dispatcher(object):
def _getMasterConnection(self): def _getMasterConnection(self):
if self.master_conn is None: if self.master_conn is None:
self.last_tid = None
self.uuid = 1 + (UUID_NAMESPACES[NodeTypes.CLIENT] << 24) self.uuid = 1 + (UUID_NAMESPACES[NodeTypes.CLIENT] << 24)
self.num_partitions = 10 self.num_partitions = 10
self.num_replicas = 1 self.num_replicas = 1
......
...@@ -335,7 +335,7 @@ class ClientTests(NEOFunctionalTest): ...@@ -335,7 +335,7 @@ class ClientTests(NEOFunctionalTest):
t3.description = 'desc' t3.description = 'desc'
st3.tpc_begin(t3) st3.tpc_begin(t3)
# retreive the last revision # retreive the last revision
data, serial = st3.load(oid, '') data, serial = st3.load(oid)
# try to store again, should not be delayed # try to store again, should not be delayed
st3.store(oid, serial, data, '', t3) st3.store(oid, serial, data, '', t3)
# the vote should not timeout # the vote should not timeout
......
...@@ -632,7 +632,8 @@ class Test(NEOThreadedTest): ...@@ -632,7 +632,8 @@ class Test(NEOThreadedTest):
t.begin() t.begin()
s0.stop() # force client to ask s1 s0.stop() # force client to ask s1
self.assertEqual(sorted(c.root()), [1]) self.assertEqual(sorted(c.root()), [1])
t0, t1 = c._storage.iterator() self.tic()
t0, t1 = c.db().storage.iterator()
finally: finally:
cluster.stop() cluster.stop()
...@@ -884,7 +885,7 @@ class Test(NEOThreadedTest): ...@@ -884,7 +885,7 @@ class Test(NEOThreadedTest):
# Now test cache invalidation during a load from a storage # Now test cache invalidation during a load from a storage
ll = LockLock() ll = LockLock()
def _loadFromStorage(orig, *args): def break_after(orig, *args):
try: try:
return orig(*args) return orig(*args)
finally: finally:
...@@ -892,7 +893,7 @@ class Test(NEOThreadedTest): ...@@ -892,7 +893,7 @@ class Test(NEOThreadedTest):
x2._p_deactivate() x2._p_deactivate()
# Remove last version of x from cache # Remove last version of x from cache
cache._remove(cache._oid_dict[x2._p_oid].pop()) cache._remove(cache._oid_dict[x2._p_oid].pop())
with ll, Patch(cluster.client, _loadFromStorage=_loadFromStorage): with ll, Patch(cluster.client, _loadFromStorage=break_after):
t = self.newThread(x2._p_activate) t = self.newThread(x2._p_activate)
ll() ll()
# At this point, x could not be found the cache and the result # At this point, x could not be found the cache and the result
...@@ -910,16 +911,18 @@ class Test(NEOThreadedTest): ...@@ -910,16 +911,18 @@ class Test(NEOThreadedTest):
self.assertEqual(x2.value, 1) self.assertEqual(x2.value, 1)
self.assertEqual(x1.value, 0) self.assertEqual(x1.value, 0)
# l1 is acquired and l2 is released def invalidations(conn):
try:
return conn._storage._invalidations
except AttributeError: # BBB: ZODB < 5
return conn._invalidated
# Change x again from 0 to 1, while the checking connection c1 # Change x again from 0 to 1, while the checking connection c1
# is suspended at the beginning of the transaction t1, # is suspended at the beginning of the transaction t1,
# between Storage.sync() and flush of invalidations. # between Storage.sync() and flush of invalidations.
def _flush_invalidations(orig):
ll()
orig()
x1._p_deactivate() x1._p_deactivate()
t1.abort() t1.abort()
with ll, Patch(c1, _flush_invalidations=_flush_invalidations): with ll, Patch(c1._storage, sync=break_after):
t = self.newThread(t1.begin) t = self.newThread(t1.begin)
ll() ll()
txn = transaction.Transaction() txn = transaction.Transaction()
...@@ -927,10 +930,14 @@ class Test(NEOThreadedTest): ...@@ -927,10 +930,14 @@ class Test(NEOThreadedTest):
client.store(x2._p_oid, tid, y, '', txn) client.store(x2._p_oid, tid, y, '', txn)
tid = client.tpc_finish(txn, None) tid = client.tpc_finish(txn, None)
client.close() client.close()
self.assertEqual(invalidations(c1), {x1._p_oid})
t.join() t.join()
# A transaction really begins when it acquires the lock to flush # A transaction really begins when it gets the last tid from the
# invalidations. The previous lastTransaction() only does a ping # storage, just before flushing invalidations (on ZODB < 5, it's
# to make sure we have a recent enough view of the DB. # when it acquires the lock to flush invalidations). The previous
# call to sync() only does a ping to make sure we have a recent
# enough view of the DB.
self.assertFalse(invalidations(c1))
self.assertEqual(x1.value, 1) self.assertEqual(x1.value, 1)
finally: finally:
...@@ -969,7 +976,7 @@ class Test(NEOThreadedTest): ...@@ -969,7 +976,7 @@ class Test(NEOThreadedTest):
# transaction before the last one, and clearing the cache before # transaction before the last one, and clearing the cache before
# reloading x. # reloading x.
c1._storage.load(x._p_oid) c1._storage.load(x._p_oid)
t0, t1, t2 = c1._storage.iterator() t0, t1, t2 = c1.db().storage.iterator()
self.assertEqual(map(u64, t0.oid_list), [0]) self.assertEqual(map(u64, t0.oid_list), [0])
self.assertEqual(map(u64, t1.oid_list), [0, 1]) self.assertEqual(map(u64, t1.oid_list), [0, 1])
# Check oid 1 is part of transaction metadata. # Check oid 1 is part of transaction metadata.
...@@ -1282,6 +1289,7 @@ class Test(NEOThreadedTest): ...@@ -1282,6 +1289,7 @@ class Test(NEOThreadedTest):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
m2c, = cluster.master.getConnectionList(cluster.client) m2c, = cluster.master.getConnectionList(cluster.client)
cluster.client._cache.clear() cluster.client._cache.clear()
c.cacheMinimize()
with cluster.client.filterConnection(cluster.storage) as c2s: with cluster.client.filterConnection(cluster.storage) as c2s:
c2s.add(disconnect) c2s.add(disconnect)
# Storages are currently notified of clients that get # Storages are currently notified of clients that get
......
...@@ -404,14 +404,14 @@ class ReplicationTests(NEOThreadedTest): ...@@ -404,14 +404,14 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
s1.stop() s1.stop()
cluster.join((s1,)) cluster.join((s1,))
t0, t1, t2 = c._storage.iterator() t0, t1, t2 = c.db().storage.iterator()
s1.resetNode() s1.resetNode()
s1.start() s1.start()
self.tic() self.tic()
self.assertEqual([], cluster.getOutdatedCells()) self.assertEqual([], cluster.getOutdatedCells())
s0.stop() s0.stop()
cluster.join((s0,)) cluster.join((s0,))
t0, t1, t2 = c._storage.iterator() t0, t1, t2 = c.db().storage.iterator()
finally: finally:
cluster.stop() cluster.stop()
......
...@@ -43,7 +43,11 @@ class ZODBTestCase(TestCase): ...@@ -43,7 +43,11 @@ class ZODBTestCase(TestCase):
def _tearDown(self, success): def _tearDown(self, success):
self._storage.cleanup() self._storage.cleanup()
try:
self.neo.stop() self.neo.stop()
except Exception:
if success:
raise
del self.neo, self._storage del self.neo, self._storage
super(ZODBTestCase, self)._tearDown(success) super(ZODBTestCase, self)._tearDown(success)
......
...@@ -28,6 +28,14 @@ class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage, ...@@ -28,6 +28,14 @@ class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage,
checkTransactionalUndoAfterPack = expectedFailure()( checkTransactionalUndoAfterPack = expectedFailure()(
TransactionalUndoStorage.checkTransactionalUndoAfterPack) TransactionalUndoStorage.checkTransactionalUndoAfterPack)
for x in ('checkUndoMultipleConflictResolution',
'checkUndoMultipleConflictResolutionReversed'):
try:
setattr(UndoTests, x,
expectedFailure(KeyError)(getattr(TransactionalUndoStorage, x)))
except AttributeError:
pass
if __name__ == "__main__": if __name__ == "__main__":
suite = unittest.makeSuite(UndoTests, 'check') suite = unittest.makeSuite(UndoTests, 'check')
unittest.main(defaultTest='suite') unittest.main(defaultTest='suite')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment