Commit 7af948cf authored by Julien Muchembled's avatar Julien Muchembled

Lockless stores/checks during replication

parent b7a5bc99
......@@ -61,7 +61,9 @@
partitions. Currently, reads succeed because feeding nodes don't delete
anything while the cluster is operational, for performance reasons:
deletion of dropped partitions must be reimplemented in a scalable way.
(HIGH AVAILABILITY)
The same thing happens for writes: storage nodes must discard
stores/checks of dropped partitions (in lockObject, that can be done by
raising ConflictError(None)). (HIGH AVAILABILITY)
Storage
- Use libmysqld instead of a stand-alone MySQL server.
......
......@@ -410,6 +410,8 @@ class Application(ThreadedApplication):
def store(self, oid, serial, data, version, transaction):
"""Store object."""
logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
if not serial: # BBB
serial = ZERO_TID
self._store(self._txn_container.get(transaction), oid, serial, data)
def _store(self, txn_context, oid, serial, data, data_serial=None):
......@@ -472,7 +474,7 @@ class Application(ThreadedApplication):
oid, (serial, conflict_serial) = pop_conflict()
except KeyError:
return
if conflict_serial == ZERO_TID:
if conflict_serial == MAX_TID:
if 1:
# XXX: disable deadlock avoidance code until it is fixed
logging.info('Deadlock avoidance on %r:%r',
......
......@@ -17,7 +17,7 @@
from ZODB.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.protocol import ZERO_TID
from neo.lib.protocol import MAX_TID
from neo.lib.util import dump
from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler
......@@ -62,10 +62,13 @@ class StorageAnswersHandler(AnswerBaseHandler):
self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflict, oid, serial):
if not conflict:
# Ignore if not locked on storage side.
return
txn_context = self.app.getHandlerData()
object_stored_counter_dict = txn_context[
'object_stored_counter_dict'][oid]
if conflict:
if conflict != serial:
# Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that
......@@ -76,10 +79,10 @@ class StorageAnswersHandler(AnswerBaseHandler):
# receive the conflict answer from the first store on S2.
logging.info('%r report a conflict for %r with %r',
conn, dump(oid), dump(conflict))
if conflict != ZERO_TID:
if conflict != MAX_TID:
# If this conflict is not already resolved, mark it for
# resolution.
if conflict <= txn_context['resolved_dict'].get(oid, ZERO_TID):
if conflict <= txn_context['resolved_dict'].get(oid, ''):
return
if conflict in object_stored_counter_dict:
raise NEOStorageError('Storages %s accepted object %s'
......
......@@ -940,10 +940,12 @@ class StoreObject(Packet):
"""
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
Answer if an object has been stored. If an object is in conflict,
a serial of the conflicting transaction is returned. In this case,
if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C.
As for IStorage, 'serial' is ZERO_TID for new objects.
Answered 'conflict' value means:
- None: lockless
- serial: ok
- MAX_TID: deadlock
- else: conflict
"""
_fmt = PStruct('ask_store_object',
POID('oid'),
......
......@@ -38,13 +38,14 @@ from neo.lib.debug import register as registerLiveDebugger
class Application(BaseApplication):
"""The storage node application."""
tm = None
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
# set the cluster name
self.name = config.getCluster()
self.tm = TransactionManager(self)
self.dm = buildDatabaseManager(config.getAdapter(),
(config.getDatabase(), config.getEngine(), config.getWait()),
)
......@@ -93,7 +94,8 @@ class Application(BaseApplication):
def log(self):
self.em.log()
self.nm.log()
self.tm.log()
if self.tm:
self.tm.log()
if self.pt is not None:
self.pt.log()
......@@ -184,6 +186,7 @@ class Application(BaseApplication):
for conn in self.em.getConnectionList():
if conn not in (self.listening_conn, self.master_conn):
conn.close()
self.tm = TransactionManager(self)
try:
self.initialize()
self.doOperation()
......@@ -194,6 +197,7 @@ class Application(BaseApplication):
logging.error('primary master is down: %s', msg)
finally:
self.checker = Checker(self)
del self.tm
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
......@@ -256,7 +260,6 @@ class Application(BaseApplication):
# Forget all unfinished data.
self.dm.dropUnfinishedData()
self.tm.reset()
self.task_queue = task_queue = deque()
try:
......
......@@ -72,7 +72,7 @@ class ClientOperationHandler(EventHandler):
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, request_time):
try:
self.app.tm.storeObject(ttid, serial, oid, compression,
locked = self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial)
except ConflictError, err:
# resolvable or not
......@@ -93,7 +93,7 @@ class ClientOperationHandler(EventHandler):
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(None))
conn.answer(Packets.AnswerStoreObject(locked))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid):
......@@ -171,7 +171,7 @@ class ClientOperationHandler(EventHandler):
def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
try:
self.app.tm.checkCurrentSerial(ttid, serial, oid)
locked = self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
......@@ -191,7 +191,7 @@ class ClientOperationHandler(EventHandler):
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(None))
conn.answer(Packets.AnswerCheckCurrentSerial(locked))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
......
......@@ -31,8 +31,8 @@ class MasterOperationHandler(BaseMasterHandler):
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit()
def notifyTransactionFinished(self, conn, *args, **kw):
self.app.replicator.transactionFinished(*args, **kw)
def notifyTransactionFinished(self, conn, *args):
self.app.replicator.transactionFinished(*args)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
......
......@@ -136,7 +136,7 @@ class Replicator(object):
app = self.app
pt = app.pt
uuid = app.uuid
self.partition_dict = p = {}
self.partition_dict = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
......@@ -160,8 +160,7 @@ class Replicator(object):
p.next_trans = p.next_obj = next_tid
p.max_ttid = None
if outdated_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=outdated_list)
self.app.tm.replicating(outdated_list)
def notifyPartitionChanges(self, cell_list):
"""This is a callback from MasterOperationHandler."""
......@@ -190,8 +189,7 @@ class Replicator(object):
p.max_ttid = INVALID_TID
added_list.append(offset)
if added_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=added_list)
self.app.tm.replicating(added_list)
if abort:
self.abort()
......@@ -326,8 +324,7 @@ class Replicator(object):
p.next_obj = add64(tid, 1)
self.updateBackupTID()
if not p.max_ttid:
p = Packets.NotifyReplicationDone(offset, tid)
self.app.master_conn.notify(p)
self.app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
......
......@@ -18,7 +18,7 @@ from time import time
from neo.lib import logging
from neo.lib.handler import EventQueue
from neo.lib.util import dump
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_TID
from neo.lib.protocol import Packets, ProtocolError, uuid_str, MAX_TID
class ConflictError(Exception):
"""
......@@ -87,6 +87,54 @@ class TransactionManager(EventQueue):
self._transaction_dict = {}
self._store_lock_dict = {}
self._load_lock_dict = {}
self._replicated = {}
self._replicating = set()
from neo.lib.util import u64
np = app.pt.getPartitions()
self.getPartition = lambda oid: u64(oid) % np
def replicating(self, offset_list):
self._replicating.update(offset_list)
# TODO: The following assertions will fail if a replicated partition is
# dropped and this partition is added again.
isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(self._replicated), (offset_list, self._replicated)
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
self._app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=offset_list)
def replicated(self, partition, tid):
# also called for readable cells in BACKINGUP state
self._replicating.discard(partition)
self._replicated[partition] = tid
self._notifyReplicated()
def _notifyReplicated(self):
getPartition = self.getPartition
store_lock_dict = self._store_lock_dict
replicated = self._replicated
notify = set(replicated)
# We sort transactions so that in case of muliple stores/checks for the
# same oid, the lock is taken by the highest ttid, which will delay
# new transactions.
for ttid, txn in sorted(self._transaction_dict.iteritems()):
for oid in txn.store_dict, txn.checked_set:
for oid in oid:
partition = getPartition(oid)
if partition in replicated:
if store_lock_dict.get(oid, ttid) != ttid:
# We have a "multi-lock" store, i.e. an
# initially-lockless store to a partition that became
# replicated.
notify.discard(partition)
store_lock_dict[oid] = ttid
for partition in notify:
# For this partition, all oids of all pending transactions
# are now locked normally and we don't rely anymore on other
# readable cells to check locks: we're really up-to-date.
self._app.master_conn.notify(Packets.NotifyReplicationDone(
partition, replicated.pop(partition)))
def register(self, conn, ttid):
"""
......@@ -107,15 +155,6 @@ class TransactionManager(EventQueue):
except KeyError:
return None
def reset(self):
"""
Reset the transaction manager
"""
EventQueue.__init__(self)
self._transaction_dict.clear()
self._store_lock_dict.clear()
self._load_lock_dict.clear()
def vote(self, ttid, txn_info=None):
"""
Store transaction information received from client node
......@@ -183,36 +222,56 @@ class TransactionManager(EventQueue):
DelayedError
ConflictError
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
if locking_tid is None:
previous_serial = None
elif locking_tid == ttid:
partition = self.getPartition(oid)
if partition in self._replicating:
# We're out-of-date so maybe:
# - we don't have all data to check for conflicts
# - we missed stores/check that would lock this one
# However, this transaction may have begun after we started to
# replicate, and we're expected to store it in full.
# Since there's at least 1 other (readable) cell that will do this
# check, we accept this store/check without taking a lock.
return
locked = self._store_lock_dict.get(oid)
if locked:
if locked < ttid:
# We have a bigger "TTID" than locking transaction, so we are
# younger: enter waiting queue so we are handled when lock gets
# released. We also want to delay (instead of conflict) if the
# client is so faster that it is committing another transaction
# before we processed UnlockInformation from the master.
logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(ttid), dump(locked))
raise DelayedError
transaction = self._transaction_dict[ttid]
if partition in self._replicated and (
oid in transaction.store_dict or oid in transaction.checked_set):
# This is a consequence of not having taken a lock during
# replication. After a ConflictError, we may be asked to "lock"
# it again. The current lock is a special one that only delays
# new transactions.
# For the cluster, we're still out-of-date and like above,
# at least 1 other (readable) cell checks for conflicts.
return
if locked != ttid:
# We have a smaller "TTID" than locking transaction, so we are
# older: this is a possible deadlock case, as we might already
# hold locks the younger transaction is waiting upon. Tell
# client that all involved storage nodes must rebase the
# already locked oids for this transaction.
logging.info('Possible deadlock on %r:%r with %r',
dump(oid), dump(ttid), dump(locked))
raise ConflictError(MAX_TID)
# If previous store was an undo, next store must be based on
# undo target.
previous_serial = self._transaction_dict[ttid].store_dict[oid][2]
previous_serial = transaction.store_dict[oid][2]
if previous_serial is None:
# XXX: use some special serial when previous store was not
# an undo ? Maybe it should just not happen.
logging.info('Transaction %s storing %s more than once',
dump(ttid), dump(oid))
elif locking_tid < ttid:
# We have a bigger TTID than locking transaction, so we are younger:
# enter waiting queue so we are handled when lock gets released.
# We also want to delay (instead of conflict) if the client is
# so faster that it is committing another transaction before we
# processed UnlockInformation from the master.
logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(ttid), dump(locking_tid))
raise DelayedError
else:
# We have a smaller TTID than locking transaction, so we are older:
# this is a possible deadlock case, as we might already hold locks
# the younger transaction is waiting upon. Make client release
# locks & reacquire them by notifying it of the possible deadlock.
logging.info('Possible deadlock on %r:%r with %r',
dump(oid), dump(ttid), dump(locking_tid))
raise ConflictError(ZERO_TID)
previous_serial = None
# XXX: Consider locking before reporting a conflict:
# - That would speed up the case of cascading conflict resolution
# by avoiding incremental resolution, assuming that the time to
......@@ -228,14 +287,16 @@ class TransactionManager(EventQueue):
raise ConflictError(previous_serial)
logging.debug('Transaction %s storing %s', dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid
return serial
def checkCurrentSerial(self, ttid, serial, oid):
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise NotRegisteredError
self.lockObject(ttid, serial, oid)
locked = self.lockObject(ttid, serial, oid)
transaction.check(oid)
return locked
def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial):
......@@ -246,13 +307,14 @@ class TransactionManager(EventQueue):
transaction = self._transaction_dict[ttid]
except KeyError:
raise NotRegisteredError
self.lockObject(ttid, serial, oid)
locked = self.lockObject(ttid, serial, oid)
# store object
if data is None:
data_id = None
else:
data_id = self._app.dm.holdData(checksum, data, compression)
transaction.store(oid, data_id, value_serial)
return locked
def abort(self, ttid, even_if_locked=False):
"""
......@@ -286,12 +348,24 @@ class TransactionManager(EventQueue):
assert lock_ttid in (ttid, None), ('Transaction %s tried'
' to release the lock on oid %s, but it was held by %s'
% (dump(ttid), dump(oid), dump(lock_ttid)))
write_locking_tid = self._store_lock_dict.pop(oid)
try:
write_locking_tid = self._store_lock_dict.pop(oid)
except KeyError:
# Lockless store (we were replicating this partition).
continue
if ttid < write_locking_tid:
# Several lockless stores for this oid and among them,
# a higher ttid is still pending.
assert self.getPartition(oid) in self._replicated, (
oid, ttid, write_locking_tid, self._replicated)
continue
assert write_locking_tid == ttid, ('Inconsistent locking'
' state: aborting %s:%s but %s has the lock.'
% (dump(ttid), dump(oid), dump(write_locking_tid)))
# remove the transaction
del self._transaction_dict[ttid]
if self._replicated:
self._notifyReplicated()
# some locks were released, some pending locks may now succeed
self.executeQueuedEvents()
......
......@@ -28,7 +28,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self.app = Mock()
# no history
self.app.dm = Mock({'getObjectHistory': []})
self.app.pt = Mock({'isAssigned': True})
self.app.pt = Mock({'isAssigned': True, 'getPartitions': 2})
self.app.em = Mock({'setTimeout': None})
self.manager = TransactionManager(self.app)
......
......@@ -33,7 +33,7 @@ from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID
from .. import expectedFailure, Patch
from . import LockLock, NEOThreadedTest, with_cluster
from . import ConnectionFilter, LockLock, NEOThreadedTest, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
......@@ -1351,11 +1351,11 @@ class Test(NEOThreadedTest):
reports a conflict after that this conflict was fully resolved with
another node.
"""
def answerStoreObject(orig, conn, conflict, **kw):
if not conflict:
def answerStoreObject(orig, conn, conflict, oid, serial):
if conflict == serial:
p.revert()
ll()
orig(conn, conflict, **kw)
orig(conn, conflict, oid, serial)
if 1:
s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction()
......@@ -1389,6 +1389,36 @@ class Test(NEOThreadedTest):
storage.store(oid, None, '*' * storage._cache._max_size, '', txn)
self.assertRaises(POSException.ConflictError, storage.tpc_vote, txn)
@with_cluster(replicas=1)
def testConflictWithOutOfDateCell(self, cluster):
"""
C1 S1 S0 C2
begin down begin
U <------- commit
up (remaining out-of-date due to suspended replication)
store ---> O (stored lockless)
`--------------> conflict
resolve -> stored lockless
`------------> locked
committed
"""
s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounterWithResolution()
t1.commit()
s1.stop()
cluster.join((s1,))
x.value += 1
t2, c2 = cluster.getTransaction()
c2.root()['x'].value += 2
t2.commit()
with ConnectionFilter() as f:
f.delayAskFetchTransactions()
s1.resetNode()
s1.start()
self.tic()
t1.commit()
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment