Commit 3a8f6f03 authored by Julien Muchembled's avatar Julien Muchembled

Drop support for ZODB3

parent 414573b9
graft tools
include neo.conf CHANGELOG.rst TODO ZODB3.patch
include neo.conf CHANGELOG.rst TODO
......@@ -52,7 +52,7 @@ Requirements
- MySQLdb: https://github.com/PyMySQL/mysqlclient-python
- For client nodes: ZODB 3.10.x or later
- For client nodes: ZODB 4.4.5 or later
Installation
============
......
Patch to ZODB 3.10.5 for ZODB unit tests.
See also monkey-patch to Connection.tpc_finish in neo/client/__init__.py
diff --git a/src/ZODB/interfaces.py b/src/ZODB/interfaces.py
index 44ded35..db5c525 100644
--- a/src/ZODB/interfaces.py
+++ b/src/ZODB/interfaces.py
@@ -776,6 +776,10 @@ def tpc_finish(transaction, func = lambda tid: None):
called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction.
+ The return value can be either None or a serial giving new
+ serial for objects whose ids were passed to previous store calls
+ in the same transaction, and for which no serial was returned
+ from either store or tpc_vote for objects passed to store.
"""
def tpc_vote(transaction):
@@ -794,8 +798,6 @@ def tpc_vote(transaction):
The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction.
- After the tpc_vote call, new serials must have been returned,
- either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be
the special value ZODB.ConflictResolution.ResolvedSerial to
diff --git a/src/ZODB/tests/BasicStorage.py b/src/ZODB/tests/BasicStorage.py
index 8e72272..61bc801 100644
--- a/src/ZODB/tests/BasicStorage.py
+++ b/src/ZODB/tests/BasicStorage.py
@@ -72,8 +72,10 @@ def checkSerialIsNoneForInitialRevision(self):
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
- self._storage.tpc_finish(txn)
+ serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
+ if newrevid is None and serial is not None:
+ newrevid = serial
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
diff --git a/src/ZODB/tests/MTStorage.py b/src/ZODB/tests/MTStorage.py
index e01c802..b57de28 100644
--- a/src/ZODB/tests/MTStorage.py
+++ b/src/ZODB/tests/MTStorage.py
@@ -155,10 +155,12 @@ def dostore(self, i):
r2 = self.storage.tpc_vote(t)
self.pause()
- self.storage.tpc_finish(t)
+ serial = self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
+ if serial is not None and revid is None:
+ revid = serial
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
diff --git a/src/ZODB/tests/RevisionStorage.py b/src/ZODB/tests/RevisionStorage.py
index 9113757..ddd2dde 100644
--- a/src/ZODB/tests/RevisionStorage.py
+++ b/src/ZODB/tests/RevisionStorage.py
@@ -150,10 +150,12 @@ def helper(tid, revid, x):
# Finish the transaction
r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
+ if serial is not None and newrevid is None:
+ newrevid = serial
return newrevid
revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2)
diff --git a/src/ZODB/tests/StorageTestBase.py b/src/ZODB/tests/StorageTestBase.py
index 9737ec4..43f29ed 100644
--- a/src/ZODB/tests/StorageTestBase.py
+++ b/src/ZODB/tests/StorageTestBase.py
@@ -134,7 +134,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials().
"""
- return handle_all_serials(oid, *args)[oid]
+ return handle_all_serials(oid, *args).get(oid)
def import_helper(name):
__import__(name)
@@ -191,7 +191,9 @@ def _dostore(self, oid=None, revid=None, data=None,
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
+ if serial is not None and revid is None:
+ revid = serial
except:
self._storage.tpc_abort(t)
raise
@@ -211,8 +213,8 @@ def _undo(self, tid, expected_oids=None, note=None):
self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
- if expected_oids is not None:
+ serial = self._storage.tpc_finish(t)
+ if expected_oids is not None and serial is None:
oids = undo_result and undo_result[1] or []
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
diff --git a/src/ZODB/tests/TransactionalUndoStorage.py b/src/ZODB/tests/TransactionalUndoStorage.py
index 72adac2..203736a 100644
--- a/src/ZODB/tests/TransactionalUndoStorage.py
+++ b/src/ZODB/tests/TransactionalUndoStorage.py
@@ -76,6 +76,12 @@ def _transaction_vote(self, trans):
def _transaction_newserial(self, oid):
return self.__serials[oid]
+ def _transaction_finish(self, t, oid_list):
+ tid = self._storage.tpc_finish(t)
+ if tid is not None:
+ for oid in oid_list:
+ self.__serials[oid] = tid
+
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
@@ -85,7 +91,7 @@ def _multi_obj_transaction(self, objs):
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
@@ -218,9 +224,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
@@ -230,9 +236,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
@@ -288,11 +294,12 @@ def checkTwoObjectUndoAtOnce(self):
tid1 = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
# We get the finalization stuff called an extra time:
- eq(len(oids), 4)
- unless(oid1 in oids)
- unless(oid2 in oids)
+ if serial is None:
+ eq(len(oids), 4)
+ unless(oid1 in oids)
+ unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
@@ -326,7 +333,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -346,7 +353,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -358,10 +365,11 @@ def checkTwoObjectUndoAgain(self):
tid = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid)
- self._storage.tpc_finish(t)
- eq(len(oids), 1)
- self.failUnless(oid1 in oids)
- self.failUnless(not oid2 in oids)
+ serial = self._storage.tpc_finish(t)
+ if serial is None:
+ eq(len(oids), 1)
+ self.failUnless(oid1 in oids)
+ self.failUnless(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
@@ -397,7 +405,7 @@ def checkNotUndoable(self):
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -683,9 +691,9 @@ def undo(i):
tid = p64(i + 1)
eq(txn.tid, tid)
- L1 = [(rec.oid, rec.tid, rec.data_txn) for rec in txn]
- L2 = [(oid, revid, None) for _tid, oid, revid in orig
- if _tid == tid]
+ L1 = {(rec.oid, rec.tid, rec.data_txn) for rec in txn}
+ L2 = {(oid, revid, None) for _tid, oid, revid in orig
+ if _tid == tid}
eq(L1, L2)
......@@ -25,51 +25,7 @@ def patch():
H = lambda f: md5(f.func_code.co_code).hexdigest()
# Allow serial to be returned as late as tpc_finish
#
# This makes possible for storage to allocate serial inside tpc_finish,
# removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort).
h = H(Connection.tpc_finish)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
# <patch>
serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert isinstance(serial, bytes), repr(serial)
for oid_iterator in (self._modified, self._creating):
for oid in oid_iterator:
obj = self._cache.get(oid, None)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
# </patch>
self._tpc_cleanup()
global OLD_ZODB
OLD_ZODB = h in (
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
if OLD_ZODB:
Connection.tpc_finish = tpc_finish
elif hasattr(Connection, '_handle_serial'): # merged upstream ?
if hasattr(Connection, '_handle_serial'): # merged upstream ?
assert hasattr(Connection, '_warn_about_returned_serial')
# sync() is used to provide a "network barrier", which is required for
......
......@@ -25,9 +25,6 @@ except ImportError:
from cPickle import dumps, loads
_protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from . import OLD_ZODB
if OLD_ZODB:
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp
from neo.lib import logging
......@@ -641,9 +638,6 @@ class Application(ThreadedApplication):
# - If possible, recover from master failure.
if txn_context.error:
raise NEOStorageError(txn_context.error)
if OLD_ZODB:
return [(oid, ResolvedSerial)
for oid in txn_context.resolved_dict]
return txn_context.resolved_dict
def tpc_abort(self, transaction):
......
......@@ -609,6 +609,10 @@ class ImporterDatabaseManager(DatabaseManager):
except TypeError: # loadBefore returned None
return False
except POSKeyError:
# loadBefore does not distinguish between an oid:
# - that does not exist at any serial
# - that was deleted
# - whose creation was undone
assert not o or o[3] is None, o
return o
if serial != tid:
......@@ -617,22 +621,15 @@ class ImporterDatabaseManager(DatabaseManager):
u_tid = u64(serial)
if u_tid <= self.zodb_tid and o:
return o
if value:
value = zodb.repickle(value)
checksum = util.makeChecksum(value)
else:
# CAVEAT: Although we think loadBefore should not return an empty
# value for a deleted object (BBB: fixed in ZODB4),
# there's no need to distinguish this case in the above
# except clause because it would be crazy to import a
# NEO DB using this backend.
checksum = None
value = zodb.repickle(value)
if not next_serial:
next_serial = db._getNextTID(db._getPartition(u_oid), u_oid, u_tid)
if next_serial:
next_serial = p64(next_serial)
return (serial, next_serial,
0, checksum, value, zodb.getDataTid(z_oid, u_tid))
return (serial, next_serial, 0,
util.makeChecksum(value),
value,
zodb.getDataTid(z_oid, u_tid))
def getTransaction(self, tid, all=False):
u64 = util.u64
......
......@@ -517,6 +517,15 @@ class TransactionalResource(object):
return lambda *_: None
return self.__getattribute__(attr)
try:
from ZODB.Connection import TransactionMetaData
except ImportError: # BBB: ZODB < 5
def getTransactionMetaData(txn, conn):
return txn
else:
def getTransactionMetaData(txn, conn):
return txn.data(conn)
class Patch(object):
"""
......
......@@ -330,6 +330,15 @@ class Node(object):
def filterConnection(self, *peers):
return ConnectionFilter(self.getConnectionList(*peers))
@contextmanager
def patchDeferred(self, method):
deferred = []
with Patch(method.__self__, **{method.__name__:
lambda orig, *args, **kw: deferred.append(
partial(orig, *args, **kw))}) as p:
yield p
self.em.wakeup(*deferred)
class ServerNode(Node):
_server_class_dict = {}
......
......@@ -36,7 +36,7 @@ from neo.lib.handler import DelayEvent, EventHandler
from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID)
from .. import Patch, TransactionalResource
from .. import Patch, TransactionalResource, getTransactionMetaData
from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \
NEOThreadedTest, RandomConflictDict, Serialized, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64
......@@ -1312,10 +1312,7 @@ class Test(NEOThreadedTest):
# Check that the storage hasn't answered to the store,
# which means that a lock is still taken for r['x'] by t2.
self.tic()
try:
txn = txn.data(c1)
except (AttributeError, KeyError): # BBB: ZODB < 5
pass
txn = getTransactionMetaData(txn, c1)
txn_context = cluster.client._txn_container.get(txn)
empty = txn_context.queue.empty()
ll()
......
......@@ -19,18 +19,22 @@ from cStringIO import StringIO
from itertools import izip_longest
import os, random, shutil, threading, time, unittest
import transaction, ZODB
from persistent import Persistent
from neo.client.exception import NEOPrimaryMasterLost
from neo.lib import logging
from neo.lib.protocol import MAX_TID
from neo.lib.util import cached_property, p64, u64
from neo.master.transactions import TransactionManager
from neo.storage.database import getAdapterKlass, importer, manager
from neo.storage.database.importer import \
Repickler, TransactionRecord, WriteBack
from .. import expectedFailure, getTempDirectory, random_tree, Patch
from .. import expectedFailure, getTempDirectory, random_tree, \
Patch, TransactionalResource, getTransactionMetaData
from . import NEOCluster, NEOThreadedTest
from ZODB import serialize
from ZODB.DB import TransactionalUndo
from ZODB.FileStorage import FileStorage
from ZODB.POSException import POSKeyError
class Equal:
......@@ -371,6 +375,45 @@ class ImporterTests(NEOThreadedTest):
t.begin()
finalCheck(r)
def testDeleteAndUndo(self):
fs_path, cfg = self.getFS()
c = ZODB.DB(FileStorage(fs_path)).open()
s = c.db().storage
r = c.root()
tid = r._p_serial
r[''] = delete = Persistent()
transaction.commit()
self.assertEqual(delete._p_oid, p64(1))
del r['']
TransactionalResource(transaction, 0, commit=lambda txn:
s.deleteObject(delete._p_oid, delete._p_serial,
getTransactionMetaData(txn, c)))
transaction.commit()
r[''] = undo = Persistent()
transaction.commit()
c.db().undo(s.undoLog(last=1)[0]['id'])
transaction.commit()
self.assertEqual(undo._p_oid, p64(2))
def check():
self.assertIsNone(s.loadBefore(delete._p_oid, tid))
for oid in delete._p_oid, undo._p_oid, p64(3):
self.assertRaises(POSKeyError, s.loadBefore, oid, MAX_TID)
check() # FileStorage
c.db().close()
importer = {'zodb': [('root', cfg)]}
with NEOCluster(importer=importer) as cluster:
storage = cluster.storage
dm = storage.dm
with storage.patchDeferred(dm._finished):
with storage.patchDeferred(dm.doOperation):
cluster.start()
s = cluster.getZODBStorage()
check() # before import
self.tic()
check() # imported, Importer getObject
self.tic()
check() # imported, direct getObject
if __name__ == "__main__":
unittest.main()
......@@ -44,7 +44,7 @@ get3rdParty(x, '3rdparty/' + x, 'https://lab.nexedi.com/nexedi/erp5'
'/raw/14b0fcdcc31c5791646f9590678ca028f5d221f5/product/ERP5Type/' + x,
'abb7970856540fd02150edd1fa9a3a3e8d0074ec526ab189684ef7ea9b41825f')
zodb_require = ['ZODB3>=3.10dev']
zodb_require = ['ZODB>=4.4.5']
extras_require = {
'admin': [],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment