Commit 572a9652 authored by Julien Muchembled's avatar Julien Muchembled

Better support of the new API to notify of resolved conflicts (store/tpc_finish)

When switching all storages to the new API in the master branch,
I found a few issues, mainly with blobs.

1. Created/modified blobs are invalidated during the first phase
   (in Connection._store_objects):

    obj._p_invalidate()

    And with the old API, _handle_serial then sets _p_serial
    Ghost objects are not supposed to have a _p_serial and with the new API,
    a few tests would fail because _p_serial are checked without activating the
    blob first.

2. Another consequence of _handle_serial not updating _p_changed/_p_serial
   immediately is that created objects are stored twice if __getstate__
   modifies itself. This case is tested in testConnection by
   doctest_lp485456_setattr_in_setstate_doesnt_cause_multiple_stores

    Hence the change in Connection._commit:
    - self._modified is already appended in Connection._store_objects
    - (obj._p_serial == z64) instead of (oid in self._creating)
      would not work for savepoints.

3. Setting _p_changed of a Blob with no uncommitted changes would cause an error
   (lp440234_Setting__p_changed_of_a_Blob_w_no_uncomitted_changes_is_noop)

    Fixed by the same change as in 2:
    - oid was appended twice to self._modified but reverted one
      (self._modified.pop() in _store_objects)
    - the test passed because _p_changed was reset early by _handle_serial
parent c5fe46d0
...@@ -269,7 +269,7 @@ class BaseStorage(UndoLogCompatible): ...@@ -269,7 +269,7 @@ class BaseStorage(UndoLogCompatible):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError( raise POSException.StorageTransactionError(
"tpc_vote called with wrong transaction") "tpc_vote called with wrong transaction")
self._vote() return self._vote()
finally: finally:
self._lock_release() self._lock_release()
...@@ -398,8 +398,8 @@ def copy(source, dest, verbose=0): ...@@ -398,8 +398,8 @@ def copy(source, dest, verbose=0):
r.data_txn, transaction) r.data_txn, transaction)
else: else:
pre = preget(oid, None) pre = preget(oid, None)
s = dest.store(oid, pre, r.data, r.version, transaction) dest.store(oid, pre, r.data, r.version, transaction)
preindex[oid] = s preindex[oid] = tid
dest.tpc_vote(transaction) dest.tpc_vote(transaction)
dest.tpc_finish(transaction) dest.tpc_finish(transaction)
......
...@@ -613,16 +613,17 @@ class Connection(ExportImport, object): ...@@ -613,16 +613,17 @@ class Connection(ExportImport, object):
raise InvalidObjectReference(obj, obj._p_jar) raise InvalidObjectReference(obj, obj._p_jar)
elif oid in self._added: elif oid in self._added:
assert obj._p_serial == z64 assert obj._p_serial == z64
elif obj._p_changed: elif obj._p_changed and oid not in self._creating:
if oid in self._invalidated: if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None) resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None: if resolve is None:
raise ConflictError(object=obj) raise ConflictError(object=obj)
self._modified.append(oid)
else: else:
# Nothing to do. It's been said that it's legal, e.g., for # Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been # an object to set _p_changed to false after it's been
# changed and registered. # changed and registered.
# And new objects that are registered after any referrer are
# already processed.
continue continue
self._store_objects(ObjectWriter(obj), transaction) self._store_objects(ObjectWriter(obj), transaction)
......
...@@ -713,9 +713,8 @@ class BlobStorageMixin(object): ...@@ -713,9 +713,8 @@ class BlobStorageMixin(object):
"""Stores data that has a BLOB attached.""" """Stores data that has a BLOB attached."""
assert not version, "Versions aren't supported." assert not version, "Versions aren't supported."
serial = self.store(oid, oldserial, data, '', transaction) serial = self.store(oid, oldserial, data, '', transaction)
self._blob_storeblob(oid, serial, blobfilename) self._blob_storeblob(oid, self._tid, blobfilename)
return serial
return self._tid
def temporaryDirectory(self): def temporaryDirectory(self):
return self.fshelper.temp_dir return self.fshelper.temp_dir
...@@ -764,8 +763,9 @@ class BlobStorage(BlobStorageMixin): ...@@ -764,8 +763,9 @@ class BlobStorage(BlobStorageMixin):
# We need to override the base storage's tpc_finish instead of # We need to override the base storage's tpc_finish instead of
# providing a _finish method because methods found on the proxied # providing a _finish method because methods found on the proxied
# object aren't rebound to the proxy # object aren't rebound to the proxy
self.__storage.tpc_finish(*arg, **kw) tid = self.__storage.tpc_finish(*arg, **kw)
self._blob_tpc_finish() self._blob_tpc_finish()
return tid
def tpc_abort(self, *arg, **kw): def tpc_abort(self, *arg, **kw):
# We need to override the base storage's abort instead of # We need to override the base storage's abort instead of
......
...@@ -117,7 +117,7 @@ class MVCCMappingStorage(MappingStorage): ...@@ -117,7 +117,7 @@ class MVCCMappingStorage(MappingStorage):
def tpc_finish(self, transaction, func = lambda tid: None): def tpc_finish(self, transaction, func = lambda tid: None):
self._data_snapshot = None self._data_snapshot = None
MappingStorage.tpc_finish(self, transaction, func) return MappingStorage.tpc_finish(self, transaction, func)
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self._data_snapshot = None self._data_snapshot = None
......
...@@ -38,34 +38,35 @@ Put some revisions of a blob object in our database and on the filesystem: ...@@ -38,34 +38,35 @@ Put some revisions of a blob object in our database and on the filesystem:
... _ = file.write(b'this is blob data 0') ... _ = file.write(b'this is blob data 0')
>>> root['blob'] = blob >>> root['blob'] = blob
>>> transaction.commit() >>> transaction.commit()
>>> tids.append(blob._p_serial)
>>> nothing = transaction.begin() >>> nothing = transaction.begin()
>>> times.append(new_time()) >>> times.append(new_time())
>>> with root['blob'].open('w') as file: >>> with root['blob'].open('w') as file:
... _ = file.write(b'this is blob data 1') ... _ = file.write(b'this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob._p_serial) >>> tids.append(blob._p_serial)
>>> transaction.commit()
>>> nothing = transaction.begin() >>> nothing = transaction.begin()
>>> times.append(new_time()) >>> times.append(new_time())
>>> with root['blob'].open('w') as file: >>> with root['blob'].open('w') as file:
... _ = file.write(b'this is blob data 2') ... _ = file.write(b'this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob._p_serial) >>> tids.append(blob._p_serial)
>>> transaction.commit()
>>> nothing = transaction.begin() >>> nothing = transaction.begin()
>>> times.append(new_time()) >>> times.append(new_time())
>>> with root['blob'].open('w') as file: >>> with root['blob'].open('w') as file:
... _ = file.write(b'this is blob data 3') ... _ = file.write(b'this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob._p_serial) >>> tids.append(blob._p_serial)
>>> transaction.commit()
>>> nothing = transaction.begin() >>> nothing = transaction.begin()
>>> times.append(new_time()) >>> times.append(new_time())
>>> with root['blob'].open('w') as file: >>> with root['blob'].open('w') as file:
... _ = file.write(b'this is blob data 4') ... _ = file.write(b'this is blob data 4')
>>> tids.append(blob._p_serial)
>>> transaction.commit() >>> transaction.commit()
>>> blob._p_activate()
>>> tids.append(blob._p_serial) >>> tids.append(blob._p_serial)
>>> oid = root['blob']._p_oid >>> oid = root['blob']._p_oid
......
...@@ -1025,9 +1025,14 @@ def doctest_lp485456_setattr_in_setstate_doesnt_cause_multiple_stores(): ...@@ -1025,9 +1025,14 @@ def doctest_lp485456_setattr_in_setstate_doesnt_cause_multiple_stores():
storing '\x00\x00\x00\x00\x00\x00\x00\x00' storing '\x00\x00\x00\x00\x00\x00\x00\x00'
storing '\x00\x00\x00\x00\x00\x00\x00\x01' storing '\x00\x00\x00\x00\x00\x00\x00\x01'
>>> conn.add(C()) Retry with the new object registered before its referrer.
>>> z = C()
>>> conn.add(z)
>>> conn.root.z = z
>>> transaction.commit() >>> transaction.commit()
storing '\x00\x00\x00\x00\x00\x00\x00\x02' storing '\x00\x00\x00\x00\x00\x00\x00\x02'
storing '\x00\x00\x00\x00\x00\x00\x00\x00'
We still see updates: We still see updates:
......
...@@ -706,8 +706,8 @@ def lp440234_Setting__p_changed_of_a_Blob_w_no_uncomitted_changes_is_noop(): ...@@ -706,8 +706,8 @@ def lp440234_Setting__p_changed_of_a_Blob_w_no_uncomitted_changes_is_noop():
>>> blob = ZODB.blob.Blob(b'blah') >>> blob = ZODB.blob.Blob(b'blah')
>>> conn.add(blob) >>> conn.add(blob)
>>> transaction.commit() >>> transaction.commit()
>>> old_serial = blob._p_serial
>>> blob._p_changed = True >>> blob._p_changed = True
>>> old_serial = blob._p_serial
>>> transaction.commit() >>> transaction.commit()
>>> with blob.open() as fp: fp.read() >>> with blob.open() as fp: fp.read()
'blah' 'blah'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment