Commit 092992db authored by Julien Muchembled's avatar Julien Muchembled

Implement deadlock avoidance

This is a first version with several optimizations possible:
- improve EventQueue (or implement a specific queue) to minimize deadlocks
- turn the RebaseObject packet into a notification

Sorting oids could also be useful to reduce the probability of deadlocks,
but that would never be enough to avoid them completely, even if there's a
single storage. For example:

1. C1 does a first store (x or y)
2. C2 stores x and y; one is delayed
3. C1 stores the other -> deadlock
   When solving the deadlock, the data of the first store may only
   exist on the storage.

2 functional tests are removed because they're redundant,
either with ZODB tests or with the new threaded tests.
parent cc8d0a7c
......@@ -4,18 +4,6 @@ or promised features of NEO (marked with N).
All the listed bugs will be fixed with high priority.
(Z) Conflict resolution not fully implemented
Even with a single storage node, so-called 'deadlock avoidance' may
happen to in order to resolve conflicts. In such cases, conflicts will not be
resolved even if your _p_resolveConflict() method would succeed, leading to a
normal ConflictError.
Although this should happen rarely enough not to affect performance, this can
be an issue if your application can't afford restarting the transaction,
e.g. because it interacted with external environment.
(N) Storage failure or update may lead to POSException or break undoLog()
......@@ -125,6 +125,7 @@
- Add ctl command to list last transactions, like fstail for FileStorage.
- Split neo/tests/threaded/
- Use another mock library: Python 3.3+ has unittest.mock, which is
available for earlier versions at
......@@ -417,24 +417,16 @@ class Application(ThreadedApplication):
while 1:
# We iterate over conflict_dict, and clear it,
# because new items may be added by calls to _store.
# This is also done atomically, to avoid race conditions
# with PrimaryNotificationsHandler.notifyDeadlock
oid, (serial, conflict_serial) = pop_conflict()
except KeyError:
if conflict_serial == MAX_TID:
if 1:
# XXX: disable deadlock avoidance code until it is fixed'Deadlock avoidance on %r:%r',
dump(oid), dump(serial))
# 'data' parameter of ConflictError is only used to report the
# class of the object. It doesn't matter if 'data' is None
# because the transaction is too big.
data = data_dict[oid][0]
except KeyError:
# succesfully stored on another storage node
data = txn_context.cache_dict[oid]
data = data_dict.pop(oid)[0]
except KeyError:
assert oid is conflict_serial is None, (oid, conflict_serial)
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
# "locking priority": when a higher value has the lock,
......@@ -443,11 +435,15 @@ class Application(ThreadedApplication):
# To recover, we must ask storages to release locks we
# hold (to let possibly-competing transactions acquire
# them), and requeue our already-sent store requests.'Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
raise NotImplementedError
ttid = txn_context.ttid'Deadlock avoidance triggered for TXN %s'
' with new locking TID %s', dump(ttid), dump(serial))
txn_context.locking_tid = serial
packet = Packets.AskRebaseTransaction(ttid, serial)
for uuid, status in txn_context.involved_nodes.iteritems():
if status < 2:
self._askStorageForWrite(txn_context, uuid, packet)
data = data_dict.pop(oid)[0]
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
......@@ -457,12 +453,17 @@ class Application(ThreadedApplication):
if self.last_tid < conflict_serial:
self.sync() # possible late invalidation (very rare)
new_data = tryToResolveConflict(oid, conflict_serial,
data = tryToResolveConflict(oid, conflict_serial,
serial, data)
except ConflictError:'Conflict resolution failed for '
'%r:%r with %r', dump(oid), dump(serial),
# With recent ZODB, get_pickle_metadata (from ZODB.utils)
# does not support empty values, so do not pass 'data'
# in this case.
raise ConflictError(oid=oid, serials=(conflict_serial,
serial), data=data or None)
else:'Conflict resolution succeeded for '
'%r:%r with %r', dump(oid), dump(serial),
......@@ -470,12 +471,18 @@ class Application(ThreadedApplication):
# Mark this conflict as resolved
resolved_dict[oid] = conflict_serial
# Try to store again
self._store(txn_context, oid, conflict_serial, new_data)
# With recent ZODB, get_pickle_metadata (from ZODB.utils) does
# not support empty values, so do not pass 'data' in this case.
raise ConflictError(oid=oid, serials=(conflict_serial,
serial), data=data or None)
self._store(txn_context, oid, conflict_serial, data)
def _askStorageForWrite(self, txn_context, uuid, packet):
node = self.nm.getByUUID(uuid)
if node is not None:
conn = self.cp.getConnForNode(node)
if conn is not None:
return conn.ask(packet, queue=txn_context.queue)
except ConnectionClosed:
txn_context.involved_nodes[uuid] = 2
def waitResponses(self, queue):
"""Wait for all requests to be answered (or their connection to be
......@@ -510,16 +517,7 @@ class Application(ThreadedApplication):
packet = Packets.AskVoteTransaction(ttid)
for uuid, status in involved_nodes.iteritems():
if status == 1 and uuid not in trans_nodes:
node = self.nm.getByUUID(uuid)
if node is not None:
conn = self.cp.getConnForNode(node)
if conn is not None:
conn.ask(packet, queue=queue)
except ConnectionClosed:
involved_nodes[uuid] = 2
self._askStorageForWrite(txn_context, uuid, packet)
# If there are failed nodes, ask the master whether they can be
# disconnected while keeping the cluster operational. If possible,
......@@ -194,6 +194,13 @@ class PrimaryNotificationsHandler(MTEventHandler):
if node and node.isConnected():
def notifyDeadlock(self, conn, ttid, locking_tid):
for txn_context in
if txn_context.ttid == ttid:
txn_context.conflict_dict[None] = locking_tid, None
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
......@@ -14,11 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.
from zlib import decompress
from ZODB.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.protocol import MAX_TID
from neo.lib.util import dump
from neo.lib.protocol import Packets
from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler
from . import AnswerBaseHandler
......@@ -75,17 +76,75 @@ class StorageAnswersHandler(AnswerBaseHandler):
# receive the conflict answer from the first store on S2.'%r report a conflict for %r with %r',
conn, dump(oid), dump(conflict))
if conflict != MAX_TID:
# If this conflict is not already resolved, mark it for
# resolution.
if conflict <= txn_context.resolved_dict.get(oid, ''):
txn_context.conflict_dict[oid] = serial, conflict
# If this conflict is not already resolved, mark it for
# resolution.
if txn_context.resolved_dict.get(oid, '') < conflict:
txn_context.conflict_dict[oid] = serial, conflict
txn_context.written(, conn.getUUID(), oid)
answerCheckCurrentSerial = answerStoreObject
def answerRebaseTransaction(self, conn, oid_list):
txn_context =
ttid = txn_context.ttid
queue = txn_context.queue
for oid in oid_list:
# We could have an extra parameter to tell the storage if we
# still have the data, and in this case revert what was done
# in Transaction.written. This would save bandwidth in case of
# conflict.
conn.ask(Packets.AskRebaseObject(ttid, oid),
queue=queue, oid=oid)
except ConnectionClosed:
txn_context.involved_nodes[conn.getUUID()] = 2
def answerRebaseObject(self, conn, conflict, oid):
if conflict:
txn_context =
serial, conflict, data = conflict
assert serial and serial < conflict, (serial, conflict)
resolved = conflict <= txn_context.resolved_dict.get(oid, '')
cached = txn_context.cache_dict.pop(oid)
except KeyError:
if resolved:
# We should still be waiting for an answer from this node.
assert conn.uuid in txn_context.data_dict[oid][1]
assert oid in txn_context.data_dict
if oid in txn_context.conflict_dict:
# Another node already reported the conflict, by answering
# to this rebase or to the previous store.
# Filling conflict_dict again would be a no-op.
assert txn_context.conflict_dict[oid] == (serial, conflict)
# A node has not answered yet to a previous store. Do not wait
# it to report the conflict because it may fail before.
# The data for this oid are now back on client side.
# Revert what was done in Transaction.written
assert not resolved
if data is None: # undo or CHECKED_SERIAL
data = cached
compression, checksum, data = data
if checksum != makeChecksum(data):
raise NEOStorageError(
'wrong checksum while getting back data for'
' object %s during rebase of transaction %s'
% (dump(oid), dump(txn_context.ttid)))
if compression:
data = decompress(data)
size = len(data)
txn_context.data_size += size
if cached:
assert cached == data
txn_context.cache_size -= size
txn_context.data_dict[oid] = data, None
txn_context.conflict_dict[oid] = serial, conflict
def answerStoreTransaction(self, conn):
......@@ -17,14 +17,22 @@
from ZODB.POSException import StorageTransactionError
from neo.lib.connection import ConnectionClosed
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import Packets
from .exception import NEOStorageError
class _WakeupPacket(object):
handler_method_name = 'pong'
decode = tuple
getId = int
class Transaction(object):
cache_size = 0 # size of data in cache_dict
data_size = 0 # size of data in data_dict
error = None
locking_tid = None
voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess
......@@ -45,6 +53,9 @@ class Transaction(object):
# status: 0 -> check only, 1 -> store, 2 -> failed
self.involved_nodes = {} # {node_id: status}
def wakeup(self, conn):
self.queue.put((conn, _WakeupPacket, {}))
def write(self, app, packet, object_id, store=1, **kw):
uuid_list = []
pt =
......@@ -53,7 +64,7 @@ class Transaction(object):
for cell in pt.getCellList(object_id):
node = cell.getNode()
uuid = node.getUUID()
status = involved.setdefault(uuid, store)
status = involved.get(uuid, -1)
if status < store:
involved[uuid] = store
elif status > 1:
......@@ -61,6 +72,13 @@ class Transaction(object):
conn = app.cp.getConnForNode(node)
if conn is not None:
if status < 0 and self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is because
# we don't need that for transaction metadata.
self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw)
......@@ -398,6 +398,19 @@ class PStructItemOrNone(PStructItem):
value = reader(self.size)
return None if value == self._None else self.unpack(value)[0]
class POption(PStruct):
def _encode(self, writer, value):
if value is None:
PStruct._encode(self, writer, value)
def _decode(self, reader):
if '\0\1'.index(reader(1)):
return PStruct._decode(self, reader)
class PList(PStructItem):
A list of homogeneous items
......@@ -949,14 +962,60 @@ class GenerateOIDs(Packet):
class Deadlock(Packet):
Ask master to generate a new TTID that will be used by the client
to rebase a transaction. S -> PM -> C
_fmt = PStruct('notify_deadlock',
class RebaseTransaction(Packet):
Rebase transaction. C -> S.
_fmt = PStruct('ask_rebase_transaction',
_answer = PStruct('answer_rebase_transaction',
class RebaseObject(Packet):
Rebase object. C -> S.
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
RebaseTransaction should answered once all objects are rebased
(so that the client can still wait on something).
_fmt = PStruct('ask_rebase_object',
_answer = PStruct('answer_rebase_object',
class StoreObject(Packet):
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
As for IStorage, 'serial' is ZERO_TID for new objects.
Answered 'conflict' value means:
- MAX_TID: deadlock
- else: conflict
_fmt = PStruct('ask_store_object',
......@@ -1658,6 +1717,12 @@ class Packets(dict):
AskNewOIDs, AnswerNewOIDs = register(
NotifyDeadlock = register(
AskRebaseTransaction, AnswerRebaseTransaction = register(
AskRebaseObject, AnswerRebaseObject = register(
AskStoreObject, AnswerStoreObject = register(
AbortTransaction = register(
......@@ -61,6 +61,9 @@ class StorageServiceHandler(BaseServiceHandler):
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
def notifyDeadlock(self, conn, *args):, *args)
def answerInformationLocked(self, conn, ttid):, conn.getUUID())
......@@ -19,13 +19,15 @@ from time import time
from struct import pack, unpack
from neo.lib import logging
from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID
from neo.lib.protocol import Packets, ProtocolError, uuid_str, \
from neo.lib.util import dump, u64, addTID, tidFromTime
class Transaction(object):
A pending transaction
locking_tid = ZERO_TID
_tid = None
_msg_id = None
_oid_list = None
......@@ -303,6 +305,19 @@ class TransactionManager(EventQueue):
logging.debug('Begin %s', txn)
return tid
def deadlock(self, storage_id, ttid, locking_tid):
txn = self._ttid_dict[ttid]
except KeyError:
if txn.locking_tid <= locking_tid:
client = txn.getNode()
txn.locking_tid = locking_tid = self._nextTID()'Deadlock avoidance triggered by %s for %s:'
' new locking tid for TXN %s is %s', uuid_str(storage_id),
uuid_str(client.getUUID()), dump(ttid), dump(locking_tid))
client.notify(Packets.NotifyDeadlock(ttid, locking_tid))
def vote(self, app, ttid, uuid_list):
Check that the transaction can be voted
......@@ -304,7 +304,7 @@ class ImporterDatabaseManager(DatabaseManager):
getPartitionTable changePartitionTable
getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction
storeData getOrphanList _pruneData deferCommit
loadData storeData getOrphanList _pruneData deferCommit
setattr(self, x, getattr(self.db, x))
......@@ -463,6 +463,11 @@ class DatabaseManager(object):
no hash collision.
def loadData(self, data_id):
"""Inverse of storeData
def holdData(self, checksum_or_id, *args):
"""Store raw data of temporary object
......@@ -541,6 +541,15 @@ class MySQLDatabaseManager(DatabaseManager):
return self.conn.insert_id()
def loadData(self, data_id):
compression, hash, value = self.query(
"SELECT compression, hash, value FROM data where id=%s"
% data_id)[0]
if compression and compression & 0x80:
compression &= 0x7f
data = ''.join(self._bigData(data))
return compression, hash, value
del _structLL
def _getDataTID(self, oid, tid=None, before_tid=None):
......@@ -404,6 +404,10 @@ class SQLiteDatabaseManager(DatabaseManager):
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data where id=?", (data_id,)).fetchone()
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getPartition(oid)
sql = 'SELECT tid, value_tid FROM obj' \
......@@ -110,6 +110,26 @@ class ClientOperationHandler(EventHandler):, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, time.time()))
def askRebaseTransaction(self, conn, *args):
conn.answer(Packets.AnswerRebaseTransaction(, *args)))
def askRebaseObject(self, conn, ttid, oid):
self._askRebaseObject(conn, ttid, oid, None)
except DelayEvent:
# locked by a previous transaction, retry later,
conn, (ttid, oid, time.time()))
def _askRebaseObject(self, conn, ttid, oid, request_time):
conflict =, oid)
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:'RebaseObject delay: %.02fs', duration)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
min_tid, max_tid, length, partition)))
......@@ -204,6 +224,8 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler):
askVoteTransaction = _readOnly
askStoreObject = _readOnly
askFinalTID = _readOnly
askRebaseObject = _readOnly
askRebaseTransaction = _readOnly
# takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
This diff is collapsed.
......@@ -23,7 +23,6 @@ import socket
from struct import pack
from neo.lib.util import makeChecksum, u64
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest
......@@ -41,25 +40,6 @@ class Tree(Persistent):
self.right = Tree(depth)
self.left = Tree(depth)
# simple persistent object with conflict resolution
class PCounter(Persistent):
_value = 0
def value(self):
return self._value
def inc(self):
self._value += 1
class PCounterWithResolution(PCounter):
def _p_resolveConflict(self, old, saved, new):
new['_value'] = saved['_value'] + new['_value']
return new
class PObject(Persistent):
......@@ -93,29 +73,6 @@ class ClientTests(NEOFunctionalTest):
conn =
return (txn, conn)
def testConflictResolutionTriggered1(self):
""" Check that ConflictError is raised on write conflict """
# create the initial objects
t, c = self.makeTransaction()
c.root()['without_resolution'] = PCounter()
# first with no conflict resolution
t1, c1 = self.makeTransaction()
t2, c2 = self.makeTransaction()
o1 = c1.root()['without_resolution']
o2 = c2.root()['without_resolution']
self.assertEqual(o1.value(), 0)
self.assertEqual(o2.value(), 0)
self.assertEqual(o1.value(), 1)
self.assertEqual(o2.value(), 2)
self.assertRaises(ConflictError, t2.commit)
def testIsolationAtZopeLevel(self):
""" Check transaction isolation within zope connection """
......@@ -270,51 +227,6 @@ class ClientTests(NEOFunctionalTest):
db2, conn2 = self.neo.getZODBConnection()
self.runWithTimeout(40, test)
def testDelayedLocksCancelled(self):
Hold a lock on an object, try to get another lock on the same
object to delay it. Then cancel the second transaction and check
that the lock is not hold when the first transaction ends
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = u'user'
t1.description = t2.description = u'desc'
oid = st1.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject())
# t1 own the lock, rev, data, '', t1)
# t2 store is delayed, rev, data, '', t2)
# cancel t2, should cancel the store too
# finish t1, should release the lock
db3, conn3 = self.neo.getZODBConnection()
st3 = conn3._storage
t3 = transaction.Transaction()
t3.user = u'user'
t3.description = u'desc'
# retrieve the last revision
data, serial = st3.load(oid)
# try to store again, should not be delayed, serial, data, '', t3)
# the vote should not timeout
self.runWithTimeout(10, test)
def testGreaterOIDSaved(self):
Store an object with an OID greater than the last generated by the
......@@ -413,6 +413,9 @@ class ClientApplication(Node,
def __init__(self, master_nodes, name, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.poll_thread.node_name = name
# Smaller cache to speed up tests that checks behaviour when it's too
# small. See also NEOCluster.cache_size
self._cache._max_size //= 1024
def _run(self):
......@@ -433,6 +436,10 @@ class ClientApplication(Node,
conn = self.cp.getConnForNode(self.nm.getByUUID(peer.uuid))
yield conn
def extraCellSortKey(self, key):
return Patch(self.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell)))
class NeoCTL(
def __init__(self, *args, **kw):
......@@ -885,10 +892,6 @@ class NEOCluster(object):
txn = transaction.TransactionManager()
return txn, (self.db if db is None else db).open(txn)
def extraCellSortKey(self, key):
return Patch(self.client.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell)))
def moduloTID(self, partition):
"""Force generation of TIDs that will be stored in given partition"""
partition = p64(partition)
......@@ -974,6 +977,8 @@ class NEOThreadedTest(NeoTestBase):
self.__exc_info = None
self.__exc_info = sys.exc_info()
if self.__exc_info[0] is NEOThreadedTest.failureException:
def join(self, timeout=None):
threading.Thread.join(self, timeout)
......@@ -1003,6 +1008,44 @@ class NEOThreadedTest(NeoTestBase):
return Patch(jar.db(), getConnForNode=lambda orig, node:
None if node.getUUID() == storage.uuid else orig(node))
def readCurrent(ob):
class ThreadId(list):
def __call__(self):
return self.index(thread.get_ident())
except ValueError:
i = len(self)
return i
class RandomConflictDict(dict):
# One must not depend on how Python iterates over dict keys, because this
# is implementation-defined behaviour. This patch makes sure of that when
# resolving conflicts.
def __new__(cls):