Commit b8711838 authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #35 from zopefoundation/implement-IMultiCommitStorage

Implement iMultiCommitStorage
parents 01cda6b4 54c31e71
...@@ -115,7 +115,7 @@ setup(name="ZEO", ...@@ -115,7 +115,7 @@ setup(name="ZEO",
tests_require = tests_require, tests_require = tests_require,
extras_require = dict(test=tests_require), extras_require = dict(test=tests_require),
install_requires = [ install_requires = [
'ZODB >= 5.0.0a1', 'ZODB >= 5.0.0a5',
'six', 'six',
'transaction >= 1.6.0', 'transaction >= 1.6.0',
'persistent >= 4.1.0', 'persistent >= 4.1.0',
......
...@@ -53,10 +53,7 @@ logger = logging.getLogger(__name__) ...@@ -53,10 +53,7 @@ logger = logging.getLogger(__name__)
# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp # max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff' m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'
try: from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
def tid2time(tid): def tid2time(tid):
return str(TimeStamp(tid)) return str(TimeStamp(tid))
...@@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None): ...@@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None):
MB = 1024**2 MB = 1024**2
@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage)
class ClientStorage(object): class ClientStorage(object):
"""A storage class that is a network client to a remote storage. """A storage class that is a network client to a remote storage.
...@@ -725,13 +723,27 @@ class ClientStorage(object): ...@@ -725,13 +723,27 @@ class ClientStorage(object):
"""Storage API: vote on a transaction. """Storage API: vote on a transaction.
""" """
tbuf = self._check_trans(txn, 'tpc_vote') tbuf = self._check_trans(txn, 'tpc_vote')
self._call('vote', id(txn)) try:
for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial)
except POSException.ConflictError as err:
oid = getattr(err, 'oid', None)
if oid is not None:
# This is a band-aid to help recover from a situation
# that shouldn't happen. A Client somehow misses some
# invalidations and has out of date data in its
# cache. We need some whay to invalidate the cache
# entry without invalidations. So, if we see a
# (unresolved) conflict error, we assume that the
# cache entry is bad and invalidate it.
self._cache.invalidate(oid, None)
raise
if tbuf.exception: if tbuf.exception:
raise tbuf.exception raise tbuf.exception
if tbuf.serials: if tbuf.resolved:
return list(tbuf.serials.items()) return list(tbuf.resolved)
else: else:
return None return None
...@@ -826,6 +838,8 @@ class ClientStorage(object): ...@@ -826,6 +838,8 @@ class ClientStorage(object):
self._update_blob_cache(tbuf, tid) self._update_blob_cache(tbuf, tid)
return tid
def _update_blob_cache(self, tbuf, tid): def _update_blob_cache(self, tbuf, tid):
"""Internal helper move blobs updated by a transaction to the cache. """Internal helper move blobs updated by a transaction to the cache.
""" """
......
...@@ -84,7 +84,7 @@ class ZEOStorage: ...@@ -84,7 +84,7 @@ class ZEOStorage:
blob_tempfile = None blob_tempfile = None
log_label = 'unconnected' log_label = 'unconnected'
locked = False # Don't have storage lock locked = False # Don't have storage lock
verifying = store_failed = 0 verifying = 0
def __init__(self, server, read_only=0): def __init__(self, server, read_only=0):
self.server = server self.server = server
...@@ -338,7 +338,6 @@ class ZEOStorage: ...@@ -338,7 +338,6 @@ class ZEOStorage:
self.blob_log = [] self.blob_log = []
self.tid = tid self.tid = tid
self.status = status self.status = status
self.store_failed = 0
self.stats.active_txns += 1 self.stats.active_txns += 1
# Assign the transaction attribute last. This is so we don't # Assign the transaction attribute last. This is so we don't
...@@ -426,38 +425,40 @@ class ZEOStorage: ...@@ -426,38 +425,40 @@ class ZEOStorage:
self.storage.tpc_begin(self.transaction) self.storage.tpc_begin(self.transaction)
for op, args in self.txnlog: for op, args in self.txnlog:
if not getattr(self, op)(*args): getattr(self, op)(*args)
break
# Blob support # Blob support
while self.blob_log and not self.store_failed: while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop() oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename) self._store(oid, oldserial, data, blobfilename)
if not self.store_failed: serials = self.storage.tpc_vote(self.transaction)
# Only call tpc_vote of no store call failed, if serials:
# otherwise the serialnos() call will deliver an if not isinstance(serials[0], bytes):
# exception that will be handled by the client in serials = (oid for (oid, serial) in serials
# its tpc_vote() method. if serial == ResolvedSerial)
serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
self.connection.async('serialnos', self.serials) self.serials.extend(serials)
except Exception: except Exception as err:
self.storage.tpc_abort(self.transaction) self.storage.tpc_abort(self.transaction)
self._clear_transaction() self._clear_transaction()
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error %s" % err, BLATHER)
if not isinstance(err, TransactionError):
logger.exception("While voting")
if delay is not None: if delay is not None:
delay.error(sys.exc_info()) delay.error(sys.exc_info())
else: else:
raise raise
else: else:
if delay is not None: if delay is not None:
delay.reply(None) delay.reply(self.serials)
else: else:
return None return self.serials
else: else:
return delay return delay
...@@ -549,120 +550,45 @@ class ZEOStorage: ...@@ -549,120 +550,45 @@ class ZEOStorage:
self._check_tid(tid, exc=StorageTransactionError) self._check_tid(tid, exc=StorageTransactionError)
self.txnlog.undo(trans_id) self.txnlog.undo(trans_id)
def _op_error(self, oid, err, op):
self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error oid=%s msg=%s" %
(oid_repr(oid), str(err)), BLATHER)
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((oid, err))
def _delete(self, oid, serial): def _delete(self, oid, serial):
err = None self.storage.deleteObject(oid, serial, self.transaction)
try:
self.storage.deleteObject(oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(oid, err, 'delete')
return err is None
def _checkread(self, oid, serial): def _checkread(self, oid, serial):
err = None self.storage.checkCurrentSerialInTransaction(
try: oid, serial, self.transaction)
self.storage.checkCurrentSerialInTransaction(
oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(oid, err, 'checkCurrentSerialInTransaction')
return err is None
def _store(self, oid, serial, data, blobfile=None): def _store(self, oid, serial, data, blobfile=None):
err = None if blobfile is None:
try: newserial = self.storage.store(
if blobfile is None: oid, serial, data, '', self.transaction)
newserial = self.storage.store(
oid, serial, data, '', self.transaction)
else:
newserial = self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as error:
self._op_error(oid, error, 'store')
err = error
else: else:
if serial != b"\0\0\0\0\0\0\0\0": newserial = self.storage.storeBlob(
self.invalidated.append(oid) oid, serial, data, blobfile, '', self.transaction)
if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)
if newserial:
if isinstance(newserial, bytes): if isinstance(newserial, bytes):
newserial = [(oid, newserial)] newserial = [(oid, newserial)]
for oid, s in newserial or (): for oid, s in newserial:
if s == ResolvedSerial: if s == ResolvedSerial:
self.stats.conflicts_resolved += 1 self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s" self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER) % oid_repr(oid), BLATHER)
self.serials.append(oid)
self.serials.append((oid, s))
return err is None
def _restore(self, oid, serial, data, prev_txn): def _restore(self, oid, serial, data, prev_txn):
err = None self.storage.restore(oid, serial, data, '', prev_txn,
try: self.transaction)
self.storage.restore(oid, serial, data, '', prev_txn,
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as err:
self._op_error(oid, err, 'restore')
return err is None
def _undo(self, trans_id): def _undo(self, trans_id):
err = None tid, oids = self.storage.undo(trans_id, self.transaction)
try: self.invalidated.extend(oids)
tid, oids = self.storage.undo(trans_id, self.transaction) self.serials.extend(oids)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(z64, err, 'undo')
else:
self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids)
return err is None
def _marshal_error(self, error):
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something that can be pickled.
if PY3:
pickler = Pickler(BytesIO(), 3)
else:
# The pure-python version requires at least one argument (PyPy)
pickler = Pickler(0)
pickler.fast = 1
try:
pickler.dump(error)
except:
msg = "Couldn't pickle storage exception: %s" % repr(error)
self.log(msg, logging.ERROR)
error = StorageServerError(msg)
return error
# IStorageIteration support # IStorageIteration support
......
...@@ -46,7 +46,7 @@ class TransactionBuffer: ...@@ -46,7 +46,7 @@ class TransactionBuffer:
# stored are builtin types -- strings or None. # stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1) self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1 self.pickler.fast = 1
self.serials = {} # processed { oid -> serial } self.resolved = set() # {oid}
self.exception = None self.exception = None
def close(self): def close(self):
...@@ -61,10 +61,9 @@ class TransactionBuffer: ...@@ -61,10 +61,9 @@ class TransactionBuffer:
def serial(self, oid, serial): def serial(self, oid, serial):
if isinstance(serial, Exception): if isinstance(serial, Exception):
self.exception = serial self.exception = serial # This transaction will never be committed
self.serials[oid] = None elif serial == ResolvedSerial:
else: self.resolved.add(oid)
self.serials[oid] = serial
def storeBlob(self, oid, blobfilename): def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename)) self.blobs.append((oid, blobfilename))
...@@ -72,7 +71,7 @@ class TransactionBuffer: ...@@ -72,7 +71,7 @@ class TransactionBuffer:
def __iter__(self): def __iter__(self):
self.file.seek(0) self.file.seek(0)
unpickler = Unpickler(self.file) unpickler = Unpickler(self.file)
serials = self.serials resolved = self.resolved
# Gaaaa, this is awkward. There can be entries in serials that # Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated # aren't in the buffer, because undo. Entries can be repeated
...@@ -83,9 +82,9 @@ class TransactionBuffer: ...@@ -83,9 +82,9 @@ class TransactionBuffer:
for i in range(self.count): for i in range(self.count):
oid, data = unpickler.load() oid, data = unpickler.load()
seen.add(oid) seen.add(oid)
yield oid, data, serials.get(oid) == ResolvedSerial yield oid, data, oid in resolved
# We may have leftover serials because undo # We may have leftover oids because undo
for oid, serial in serials.items(): for oid in resolved:
if oid not in seen: if oid not in seen:
yield oid, None, serial == ResolvedSerial yield oid, None, True
...@@ -17,7 +17,7 @@ class ServerProtocol(base.Protocol): ...@@ -17,7 +17,7 @@ class ServerProtocol(base.Protocol):
"""asyncio low-level ZEO server interface """asyncio low-level ZEO server interface
""" """
protocols = b'Z4', b'Z5' protocols = (b'Z5', )
name = 'server protocol' name = 'server protocol'
methods = set(('register', )) methods = set(('register', ))
...@@ -161,7 +161,7 @@ class Delay: ...@@ -161,7 +161,7 @@ class Delay:
def error(self, exc_info): def error(self, exc_info):
self.sent = 'error' self.sent = 'error'
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info) logger.error("Error raised in delayed method", exc_info=exc_info)
self.protocol.send_error(self.msgid, exc_info[1]) self.protocol.send_error(self.msgid, exc_info[1])
def __repr__(self): def __repr__(self):
...@@ -198,5 +198,4 @@ class MTDelay(Delay): ...@@ -198,5 +198,4 @@ class MTDelay(Delay):
def error(self, exc_info): def error(self, exc_info):
self.ready.wait() self.ready.wait()
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info)
self.protocol.call_soon_threadsafe(Delay.error, self, exc_info) self.protocol.call_soon_threadsafe(Delay.error, self, exc_info)
...@@ -750,7 +750,7 @@ class ServerTests(Base, setupstack.TestCase): ...@@ -750,7 +750,7 @@ class ServerTests(Base, setupstack.TestCase):
self.target = protocol.zeo_storage self.target = protocol.zeo_storage
if finish: if finish:
self.assertEqual(self.pop(parse=False), best_protocol_version) self.assertEqual(self.pop(parse=False), best_protocol_version)
protocol.data_received(sized(b'Z4')) protocol.data_received(sized(b'Z5'))
return protocol return protocol
message_id = 0 message_id = 0
...@@ -788,9 +788,9 @@ class ServerTests(Base, setupstack.TestCase): ...@@ -788,9 +788,9 @@ class ServerTests(Base, setupstack.TestCase):
self.assertEqual(self.pop(parse=False), best_protocol_version) self.assertEqual(self.pop(parse=False), best_protocol_version)
# The client sends it's protocol: # The client sends it's protocol:
protocol.data_received(sized(b'Z4')) protocol.data_received(sized(b'Z5'))
self.assertEqual(protocol.protocol_version, b'Z4') self.assertEqual(protocol.protocol_version, b'Z5')
protocol.zeo_storage.notify_connected.assert_called_once_with(protocol) protocol.zeo_storage.notify_connected.assert_called_once_with(protocol)
......
This diff is collapsed.
...@@ -24,8 +24,7 @@ class StaleCache(object): ...@@ -24,8 +24,7 @@ class StaleCache(object):
class IClientCache(zope.interface.Interface): class IClientCache(zope.interface.Interface):
"""Client cache interface. """Client cache interface.
Note that caches need not be thread safe, fpr the most part, Note that caches need to be thread safe.
except for getLastTid, which may be called from multiple threads.
""" """
def close(): def close():
......
...@@ -1021,90 +1021,6 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1021,90 +1021,6 @@ class TimeoutTests(CommonSetupTearDown):
# or the server. # or the server.
self.assertRaises(KeyError, storage.load, oid, '') self.assertRaises(KeyError, storage.load, oid, '')
def checkTimeoutProvokingConflicts(self):
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty.
self.assert_(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
# We need to successfully commit an object now so we have something to
# conflict about.
t = Transaction()
storage.tpc_begin(t)
revid1a = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
revid1b = storage.tpc_vote(t)
revid1 = handle_serials(oid, revid1a, revid1b)
storage.tpc_finish(t)
# Now do a store, sleeping before the finish so as to cause a timeout.
obj.value = 8
t = Transaction()
old_connection_count = storage.connection_count_for_tests
storage.tpc_begin(t)
revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
revid2b = storage.tpc_vote(t)
revid2 = handle_serials(oid, revid2a, revid2b)
# Now sleep long enough for the storage to time out.
# This used to sleep for 3 seconds, and sometimes (but very rarely)
# failed then. Now we try for a minute. It typically succeeds
# on the second time thru the loop, and, since self.timeout is 1,
# it's typically faster now (2/1.8 ~= 1.11 seconds sleeping instead
# of 3).
deadline = time.time() + 60 # wait up to a minute
while time.time() < deadline:
if (storage.is_connected() and
(storage.connection_count_for_tests == old_connection_count)
):
time.sleep(self.timeout / 1.8)
else:
break
self.assert_(
(not storage.is_connected())
or
(storage.connection_count_for_tests > old_connection_count)
)
storage._wait()
self.assert_(storage.is_connected())
# We expect finish to fail.
self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
storage.tpc_abort(t)
# Now we think we've committed the second transaction, but we really
# haven't. A third one should produce a POSKeyError on the server,
# which manifests as a ConflictError on the client.
obj.value = 9
t = Transaction()
storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t)
self.assertRaises(ConflictError, storage.tpc_vote, t)
# Even aborting won't help.
storage.tpc_abort(t)
self.assertRaises(ZODB.POSException.StorageTransactionError,
storage.tpc_finish, t)
# Try again.
obj.value = 10
t = Transaction()
storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t)
# Even aborting won't help.
self.assertRaises(ConflictError, storage.tpc_vote, t)
# Abort this one and try a transaction that should succeed.
storage.tpc_abort(t)
# Now do a store.
obj.value = 11
t = Transaction()
storage.tpc_begin(t)
revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
revid2b = storage.tpc_vote(t)
revid2 = handle_serials(oid, revid2a, revid2b)
storage.tpc_finish(t)
# Now load the object and verify that it has a value of 11.
data, revid = storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(11))
self.assertEqual(revid, revid2)
class MSTThread(threading.Thread): class MSTThread(threading.Thread):
__super_init = threading.Thread.__init__ __super_init = threading.Thread.__init__
......
...@@ -94,6 +94,10 @@ def runner(config, qin, qout, timeout=None, ...@@ -94,6 +94,10 @@ def runner(config, qin, qout, timeout=None,
import ZEO.asyncio.server import ZEO.asyncio.server
old_protocol = ZEO.asyncio.server.best_protocol_version old_protocol = ZEO.asyncio.server.best_protocol_version
ZEO.asyncio.server.best_protocol_version = protocol ZEO.asyncio.server.best_protocol_version = protocol
old_protocols = ZEO.asyncio.server.ServerProtocol.protocols
ZEO.asyncio.server.ServerProtocol.protocols = tuple(sorted(
set(old_protocols) | set([protocol])
))
try: try:
import ZEO.runzeo, threading import ZEO.runzeo, threading
...@@ -144,8 +148,8 @@ def runner(config, qin, qout, timeout=None, ...@@ -144,8 +148,8 @@ def runner(config, qin, qout, timeout=None,
finally: finally:
if old_protocol: if old_protocol:
ZEO.asyncio.server.best_protocol_version = protocol ZEO.asyncio.server.best_protocol_version = old_protocol
ZEO.asyncio.server.ServerProtocol.protocols = old_protocols
def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None): def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop') qin.put('stop')
......
...@@ -5,7 +5,7 @@ A full test of all protocols isn't practical. But we'll do a limited ...@@ -5,7 +5,7 @@ A full test of all protocols isn't practical. But we'll do a limited
test that at least the current and previous protocols are supported in test that at least the current and previous protocols are supported in
both directions. both directions.
Let's start a Z309 server Let's start a Z4 server
>>> storage_conf = ''' >>> storage_conf = '''
... <blobstorage> ... <blobstorage>
...@@ -94,82 +94,85 @@ A current client should be able to connect to a old server: ...@@ -94,82 +94,85 @@ A current client should be able to connect to a old server:
>>> zope.testing.setupstack.rmtree('blobs') >>> zope.testing.setupstack.rmtree('blobs')
>>> zope.testing.setupstack.rmtree('server-blobs') >>> zope.testing.setupstack.rmtree('server-blobs')
And the other way around: #############################################################################
# Note that the ZEO 5.0 server only supports clients that use the Z5 protocol
>>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5)) # And the other way around:
Note that we'll have to pull some hijinks: # >>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5))
>>> import ZEO.asyncio.client # Note that we'll have to pull some hijinks:
>>> old_protocols = ZEO.asyncio.client.Protocol.protocols
>>> ZEO.asyncio.client.Protocol.protocols = [b'Z4']
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') # >>> import ZEO.asyncio.client
>>> db.storage.protocol_version # >>> old_protocols = ZEO.asyncio.client.Protocol.protocols
b'Z4' # >>> ZEO.asyncio.client.Protocol.protocols = [b'Z4']
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> conn.root()['blob1'] = ZODB.blob.Blob() # >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> with conn.root()['blob1'].open('w') as f: # >>> db.storage.protocol_version
... r = f.write(b'blob data 1') # b'Z4'
>>> transaction.commit() # >>> wait_connected(db.storage)
# >>> conn = db.open()
# >>> conn.root().x = 0
# >>> transaction.commit()
# >>> len(db.history(conn.root()._p_oid, 99))
# 2
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True) # >>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> wait_connected(db2.storage) # >>> with conn.root()['blob1'].open('w') as f:
>>> conn2 = db2.open() # ... r = f.write(b'blob data 1')
>>> for i in range(5): # >>> transaction.commit()
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> with conn2.root()['blob2'].open('w') as f:
... r = f.write(b'blob data 2')
>>> transaction.commit()
# >>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
# >>> wait_connected(db2.storage)
# >>> conn2 = db2.open()
# >>> for i in range(5):
# ... conn2.root().x += 1
# ... transaction.commit()
# >>> conn2.root()['blob2'] = ZODB.blob.Blob()
# >>> with conn2.root()['blob2'].open('w') as f:
# ... r = f.write(b'blob data 2')
# >>> transaction.commit()
>>> @wait_until()
... def x_to_be_5():
... conn.sync()
... return conn.root().x == 5
>>> db.close() # >>> @wait_until()
# ... def x_to_be_5():
# ... conn.sync()
# ... return conn.root().x == 5
>>> for i in range(2): # >>> db.close()
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') # >>> for i in range(2):
>>> wait_connected(db.storage) # ... conn2.root().x += 1
>>> conn = db.open() # ... transaction.commit()
>>> conn.root().x
7
>>> db.close() # >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
# >>> wait_connected(db.storage)
# >>> conn = db.open()
# >>> conn.root().x
# 7
>>> for i in range(10): # >>> db.close()
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') # >>> for i in range(10):
>>> wait_connected(db.storage) # ... conn2.root().x += 1
>>> conn = db.open() # ... transaction.commit()
>>> conn.root().x
17
>>> with conn.root()['blob1'].open() as f: # >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
... f.read() # >>> wait_connected(db.storage)
b'blob data 1' # >>> conn = db.open()
>>> with conn.root()['blob2'].open() as f: # >>> conn.root().x
... f.read() # 17
b'blob data 2'
>>> db2.close() # >>> with conn.root()['blob1'].open() as f:
>>> db.close() # ... f.read()
# b'blob data 1'
# >>> with conn.root()['blob2'].open() as f:
# ... f.read()
# b'blob data 2'
# >>> db2.close()
# >>> db.close()
Undo the hijinks: # Undo the hijinks:
>>> ZEO.asyncio.client.Protocol.protocols = old_protocols # >>> ZEO.asyncio.client.Protocol.protocols = old_protocols
...@@ -731,24 +731,23 @@ class StorageServerWrapper: ...@@ -731,24 +731,23 @@ class StorageServerWrapper:
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ') self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
vote_result = self.server.vote(id(transaction)) result = self.server.vote(id(transaction))
assert vote_result is None assert result == self.server.connection.serials[:]
result = self.server.connection.serials[:]
del self.server.connection.serials[:] del self.server.connection.serials[:]
return result return result
def store(self, oid, serial, data, version_ignored, transaction): def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, id(transaction)) self.server.storea(oid, serial, data, id(transaction))
def send_reply(self, *args): # Masquerade as conn def send_reply(self, _, result): # Masquerade as conn
pass self._result = result
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self.server.tpc_abort(id(transaction)) self.server.tpc_abort(id(transaction))
def tpc_finish(self, transaction, func = lambda: None): def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction)).set_sender(0, self) self.server.tpc_finish(id(transaction)).set_sender(0, self)
return self._result
def multiple_storages_invalidation_queue_is_not_insane(): def multiple_storages_invalidation_queue_is_not_insane():
""" """
...@@ -915,14 +914,14 @@ def tpc_finish_error(): ...@@ -915,14 +914,14 @@ def tpc_finish_error():
buffer, sadly, using implementation details: buffer, sadly, using implementation details:
>>> tbuf = t.data(client) >>> tbuf = t.data(client)
>>> tbuf.serials = None >>> tbuf.resolved = None
tpc_finish will fail: tpc_finish will fail:
>>> client.tpc_finish(t) # doctest: +ELLIPSIS >>> client.tpc_finish(t) # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
AttributeError: ... TypeError: ...
>>> client.tpc_abort(t) >>> client.tpc_abort(t)
>>> t.abort() >>> t.abort()
......
...@@ -78,6 +78,8 @@ will conflict. It will be blocked at the vote call. ...@@ -78,6 +78,8 @@ will conflict. It will be blocked at the vote call.
>>> class Sender: >>> class Sender:
... def send_reply(self, id, reply): ... def send_reply(self, id, reply):
... print('reply', id, reply) ... print('reply', id, reply)
... def send_error(self, id, err):
... print('error', id, err)
>>> delay.set_sender(1, Sender()) >>> delay.set_sender(1, Sender())
>>> logger = logging.getLogger('ZEO') >>> logger = logging.getLogger('ZEO')
...@@ -87,13 +89,20 @@ will conflict. It will be blocked at the vote call. ...@@ -87,13 +89,20 @@ will conflict. It will be blocked at the vote call.
Now, when we abort the transaction for the first client. The second Now, when we abort the transaction for the first client. The second
client will be restarted. It will get a conflict error, that is client will be restarted. It will get a conflict error, that is
handled correctly: raised to the client:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
reply 1 None Error raised in delayed method
Traceback (most recent call last):
...
ZODB.POSException.ConflictError: ...
error 1 database conflict error ...
The transaction is aborted by the server:
>>> fs.tpc_transaction() is not None >>> fs.tpc_transaction() is None
True True
>>> zs2.connected >>> zs2.connected
True True
...@@ -116,7 +125,7 @@ And an initial client. ...@@ -116,7 +125,7 @@ And an initial client.
>>> zs1 = ZEO.tests.servertesting.client(server, 1) >>> zs1 = ZEO.tests.servertesting.client(server, 1)
>>> zs1.tpc_begin('0', '', '', {}) >>> zs1.tpc_begin('0', '', '', {})
>>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0') >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, b'x', '0')
Intentionally break zs1: Intentionally break zs1:
...@@ -135,7 +144,7 @@ We can start another client and get the storage lock. ...@@ -135,7 +144,7 @@ We can start another client and get the storage lock.
>>> zs1 = ZEO.tests.servertesting.client(server, 1) >>> zs1 = ZEO.tests.servertesting.client(server, 1)
>>> zs1.tpc_begin('1', '', '', {}) >>> zs1.tpc_begin('1', '', '', {})
>>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1') >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, b'x', '1')
>>> _ = zs1.vote('1') # doctest: +ELLIPSIS >>> _ = zs1.vote('1') # doctest: +ELLIPSIS
>>> zs1.tpc_finish('1').set_sender(0, zs1.connection) >>> zs1.tpc_finish('1').set_sender(0, zs1.connection)
...@@ -220,7 +229,7 @@ We start a transaction and vote, this leads to getting the lock. ...@@ -220,7 +229,7 @@ We start a transaction and vote, this leads to getting the lock.
ZEO.asyncio.server INFO ZEO.asyncio.server INFO
received handshake b'Z5' received handshake b'Z5'
>>> tid1 = start_trans(zs1) >>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
...@@ -477,7 +486,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -477,7 +486,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
ZEO.asyncio.server INFO ZEO.asyncio.server INFO
received handshake b'Z5' received handshake b'Z5'
>>> tid1 = start_trans(zs1) >>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
...@@ -493,7 +502,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -493,7 +502,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
ZEO.asyncio.server INFO ZEO.asyncio.server INFO
received handshake b'Z5' received handshake b'Z5'
>>> tid2 = start_trans(zs2) >>> tid2 = start_trans(zs2)
>>> zs2.vote(tid2) # doctest: +ELLIPSIS >>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0 (test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment