Commit 8b4689d5 authored by Jim Fulton's avatar Jim Fulton

provide IMultiCommitStorage

parent 2529c25b
...@@ -53,10 +53,7 @@ logger = logging.getLogger(__name__) ...@@ -53,10 +53,7 @@ logger = logging.getLogger(__name__)
# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp # max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff' m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'
try: from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
def tid2time(tid): def tid2time(tid):
return str(TimeStamp(tid)) return str(TimeStamp(tid))
...@@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None): ...@@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None):
MB = 1024**2 MB = 1024**2
@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage)
class ClientStorage(object): class ClientStorage(object):
"""A storage class that is a network client to a remote storage. """A storage class that is a network client to a remote storage.
...@@ -726,7 +724,8 @@ class ClientStorage(object): ...@@ -726,7 +724,8 @@ class ClientStorage(object):
""" """
tbuf = self._check_trans(txn, 'tpc_vote') tbuf = self._check_trans(txn, 'tpc_vote')
try: try:
self._call('vote', id(txn)) for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial)
except POSException.ConflictError as err: except POSException.ConflictError as err:
oid = getattr(err, 'oid', None) oid = getattr(err, 'oid', None)
if oid is not None: if oid is not None:
...@@ -743,8 +742,8 @@ class ClientStorage(object): ...@@ -743,8 +742,8 @@ class ClientStorage(object):
if tbuf.exception: if tbuf.exception:
raise tbuf.exception raise tbuf.exception
if tbuf.serials: if tbuf.resolved:
return list(tbuf.serials.items()) return list(tbuf.resolved)
else: else:
return None return None
...@@ -839,6 +838,8 @@ class ClientStorage(object): ...@@ -839,6 +838,8 @@ class ClientStorage(object):
self._update_blob_cache(tbuf, tid) self._update_blob_cache(tbuf, tid)
return tid
def _update_blob_cache(self, tbuf, tid): def _update_blob_cache(self, tbuf, tid):
"""Internal helper move blobs updated by a transaction to the cache. """Internal helper move blobs updated by a transaction to the cache.
""" """
......
...@@ -427,7 +427,6 @@ class ZEOStorage: ...@@ -427,7 +427,6 @@ class ZEOStorage:
for op, args in self.txnlog: for op, args in self.txnlog:
getattr(self, op)(*args) getattr(self, op)(*args)
# Blob support # Blob support
while self.blob_log: while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop() oid, oldserial, data, blobfilename = self.blob_log.pop()
...@@ -435,9 +434,11 @@ class ZEOStorage: ...@@ -435,9 +434,11 @@ class ZEOStorage:
serials = self.storage.tpc_vote(self.transaction) serials = self.storage.tpc_vote(self.transaction)
if serials: if serials:
self.serials.extend(serials) if not isinstance(serials[0], bytes):
serials = (oid for (oid, serial) in serials
if serial == ResolvedSerial)
self.connection.async('serialnos', self.serials) self.serials.extend(serials)
except Exception as err: except Exception as err:
self.storage.tpc_abort(self.transaction) self.storage.tpc_abort(self.transaction)
...@@ -455,9 +456,9 @@ class ZEOStorage: ...@@ -455,9 +456,9 @@ class ZEOStorage:
raise raise
else: else:
if delay is not None: if delay is not None:
delay.reply(None) delay.reply(self.serials)
else: else:
return None return self.serials
else: else:
return delay return delay
...@@ -567,17 +568,18 @@ class ZEOStorage: ...@@ -567,17 +568,18 @@ class ZEOStorage:
if serial != b"\0\0\0\0\0\0\0\0": if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid) self.invalidated.append(oid)
if newserial:
if isinstance(newserial, bytes): if isinstance(newserial, bytes):
newserial = [(oid, newserial)] newserial = [(oid, newserial)]
for oid, s in newserial or (): for oid, s in newserial:
if s == ResolvedSerial: if s == ResolvedSerial:
self.stats.conflicts_resolved += 1 self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s" self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER) % oid_repr(oid), BLATHER)
self.serials.append(oid)
self.serials.append((oid, s))
def _restore(self, oid, serial, data, prev_txn): def _restore(self, oid, serial, data, prev_txn):
self.storage.restore(oid, serial, data, '', prev_txn, self.storage.restore(oid, serial, data, '', prev_txn,
...@@ -586,7 +588,7 @@ class ZEOStorage: ...@@ -586,7 +588,7 @@ class ZEOStorage:
def _undo(self, trans_id): def _undo(self, trans_id):
tid, oids = self.storage.undo(trans_id, self.transaction) tid, oids = self.storage.undo(trans_id, self.transaction)
self.invalidated.extend(oids) self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids) self.serials.extend(oids)
# IStorageIteration support # IStorageIteration support
......
...@@ -46,7 +46,7 @@ class TransactionBuffer: ...@@ -46,7 +46,7 @@ class TransactionBuffer:
# stored are builtin types -- strings or None. # stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1) self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1 self.pickler.fast = 1
self.serials = {} # processed { oid -> serial } self.resolved = set() # {oid}
self.exception = None self.exception = None
def close(self): def close(self):
...@@ -61,10 +61,9 @@ class TransactionBuffer: ...@@ -61,10 +61,9 @@ class TransactionBuffer:
def serial(self, oid, serial): def serial(self, oid, serial):
if isinstance(serial, Exception): if isinstance(serial, Exception):
self.exception = serial self.exception = serial # This transaction will never be committed
self.serials[oid] = None elif serial == ResolvedSerial:
else: self.resolved.add(oid)
self.serials[oid] = serial
def storeBlob(self, oid, blobfilename): def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename)) self.blobs.append((oid, blobfilename))
...@@ -72,7 +71,7 @@ class TransactionBuffer: ...@@ -72,7 +71,7 @@ class TransactionBuffer:
def __iter__(self): def __iter__(self):
self.file.seek(0) self.file.seek(0)
unpickler = Unpickler(self.file) unpickler = Unpickler(self.file)
serials = self.serials resolved = self.resolved
# Gaaaa, this is awkward. There can be entries in serials that # Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated # aren't in the buffer, because undo. Entries can be repeated
...@@ -83,9 +82,9 @@ class TransactionBuffer: ...@@ -83,9 +82,9 @@ class TransactionBuffer:
for i in range(self.count): for i in range(self.count):
oid, data = unpickler.load() oid, data = unpickler.load()
seen.add(oid) seen.add(oid)
yield oid, data, serials.get(oid) == ResolvedSerial yield oid, data, oid in resolved
# We may have leftover serials because undo # We may have leftover oids because undo
for oid, serial in serials.items(): for oid in resolved:
if oid not in seen: if oid not in seen:
yield oid, None, serial == ResolvedSerial yield oid, None, True
...@@ -731,24 +731,23 @@ class StorageServerWrapper: ...@@ -731,24 +731,23 @@ class StorageServerWrapper:
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ') self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
vote_result = self.server.vote(id(transaction)) result = self.server.vote(id(transaction))
assert vote_result is None assert result == self.server.connection.serials[:]
result = self.server.connection.serials[:]
del self.server.connection.serials[:] del self.server.connection.serials[:]
return result return result
def store(self, oid, serial, data, version_ignored, transaction): def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, id(transaction)) self.server.storea(oid, serial, data, id(transaction))
def send_reply(self, *args): # Masquerade as conn def send_reply(self, _, result): # Masquerade as conn
pass self._result = result
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self.server.tpc_abort(id(transaction)) self.server.tpc_abort(id(transaction))
def tpc_finish(self, transaction, func = lambda: None): def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction)).set_sender(0, self) self.server.tpc_finish(id(transaction)).set_sender(0, self)
return self._result
def multiple_storages_invalidation_queue_is_not_insane(): def multiple_storages_invalidation_queue_is_not_insane():
""" """
...@@ -915,14 +914,14 @@ def tpc_finish_error(): ...@@ -915,14 +914,14 @@ def tpc_finish_error():
buffer, sadly, using implementation details: buffer, sadly, using implementation details:
>>> tbuf = t.data(client) >>> tbuf = t.data(client)
>>> tbuf.serials = None >>> tbuf.resolved = None
tpc_finish will fail: tpc_finish will fail:
>>> client.tpc_finish(t) # doctest: +ELLIPSIS >>> client.tpc_finish(t) # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
AttributeError: ... TypeError: ...
>>> client.tpc_abort(t) >>> client.tpc_abort(t)
>>> t.abort() >>> t.abort()
......
...@@ -229,7 +229,7 @@ We start a transaction and vote, this leads to getting the lock. ...@@ -229,7 +229,7 @@ We start a transaction and vote, this leads to getting the lock.
ZEO.asyncio.server INFO ZEO.asyncio.server INFO
received handshake b'Z5' received handshake b'Z5'
>>> tid1 = start_trans(zs1) >>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
...@@ -486,7 +486,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -486,7 +486,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
ZEO.asyncio.server INFO ZEO.asyncio.server INFO
received handshake b'Z5' received handshake b'Z5'
>>> tid1 = start_trans(zs1) >>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
...@@ -502,7 +502,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -502,7 +502,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
ZEO.asyncio.server INFO ZEO.asyncio.server INFO
received handshake b'Z5' received handshake b'Z5'
>>> tid2 = start_trans(zs2) >>> tid2 = start_trans(zs2)
>>> zs2.vote(tid2) # doctest: +ELLIPSIS >>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0 (test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment