Commit 74c69d54 authored by Julien Muchembled's avatar Julien Muchembled

Fix conflict handling after a successful store to a node being disconnected...

Fix conflict handling after a successful store to a node being disconnected for having missed a transaction
parent d3780906
...@@ -400,10 +400,10 @@ class Application(ThreadedApplication): ...@@ -400,10 +400,10 @@ class Application(ThreadedApplication):
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
txn_context.data_size += size txn_context.data_size += size
# Store object in tmp cache # Store object in tmp cache
txn_context.data_dict[oid] = data
packet = Packets.AskStoreObject(oid, serial, compression, packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid) checksum, compressed_data, data_serial, ttid)
txn_context.write(self, packet, oid, oid=oid, serial=serial) txn_context.data_dict[oid] = data, txn_context.write(
self, packet, oid, oid=oid, serial=serial)
while txn_context.data_size >= self._cache._max_size: while txn_context.data_size >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context) self._waitAnyTransactionMessage(txn_context)
...@@ -430,7 +430,7 @@ class Application(ThreadedApplication): ...@@ -430,7 +430,7 @@ class Application(ThreadedApplication):
# class of the object. It doesn't matter if 'data' is None # class of the object. It doesn't matter if 'data' is None
# because the transaction is too big. # because the transaction is too big.
try: try:
data = data_dict[oid] data = data_dict[oid][0]
except KeyError: except KeyError:
# succesfully stored on another storage node # succesfully stored on another storage node
data = txn_context.cache_dict[oid] data = txn_context.cache_dict[oid]
...@@ -447,7 +447,7 @@ class Application(ThreadedApplication): ...@@ -447,7 +447,7 @@ class Application(ThreadedApplication):
dump(oid), dump(serial)) dump(oid), dump(serial))
raise NotImplementedError raise NotImplementedError
else: else:
data = data_dict.pop(oid) data = data_dict.pop(oid)[0]
if data is CHECKED_SERIAL: if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial, raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial)) serial))
...@@ -872,9 +872,10 @@ class Application(ThreadedApplication): ...@@ -872,9 +872,10 @@ class Application(ThreadedApplication):
ttid = txn_context.ttid ttid = txn_context.ttid
# ZODB.Connection performs calls 'checkCurrentSerialInTransaction' # ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
# after stores, and skips oids that have been successfully stored. # after stores, and skips oids that have been successfully stored.
assert oid not in txn_context.cache_dict, (oid, txn_context) assert oid not in txn_context.cache_dict, oid
txn_context.data_dict.setdefault(oid, CHECKED_SERIAL) assert oid not in txn_context.data_dict, oid
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid) packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
txn_context.write(self, packet, oid, 0, oid=oid, serial=serial) txn_context.data_dict[oid] = CHECKED_SERIAL, txn_context.write(
self, packet, oid, 0, oid=oid, serial=serial)
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
...@@ -63,12 +63,8 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -63,12 +63,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
self.app.setHandlerData(args) self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflict, oid, serial): def answerStoreObject(self, conn, conflict, oid, serial):
if not conflict:
# Ignore if not locked on storage side. We only had to receive
# this answer, so that this storage is not marked as failed.
return
txn_context = self.app.getHandlerData() txn_context = self.app.getHandlerData()
if conflict != serial: if conflict:
# Conflicts can not be resolved now because 'conn' is locked. # Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in # We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that # 'conflict_dict') to avoid any deadlock with another thread that
...@@ -86,22 +82,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -86,22 +82,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
return return
txn_context.conflict_dict[oid] = serial, conflict txn_context.conflict_dict[oid] = serial, conflict
else: else:
try: txn_context.written(self.app, conn.getUUID(), oid)
data = txn_context.data_dict.pop(oid)
except KeyError: # replica, or multiple undo
return
if type(data) is str:
size = len(data)
txn_context.data_size -= size
size += txn_context.cache_size
if size < self.app._cache._max_size:
txn_context.cache_size = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
txn_context.cache_dict[oid] = data
answerCheckCurrentSerial = answerStoreObject answerCheckCurrentSerial = answerStoreObject
...@@ -113,7 +94,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -113,7 +94,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
def connectionClosed(self, conn): def connectionClosed(self, conn):
txn_context = self.app.getHandlerData() txn_context = self.app.getHandlerData()
if type(txn_context) is Transaction: if type(txn_context) is Transaction:
txn_context.involved_nodes[conn.getUUID()] = 2 txn_context.nodeLost(self.app, conn.getUUID())
super(StorageAnswersHandler, self).connectionClosed(conn) super(StorageAnswersHandler, self).connectionClosed(conn)
def answerTIDsFrom(self, conn, tid_list): def answerTIDsFrom(self, conn, tid_list):
......
...@@ -32,9 +32,9 @@ class Transaction(object): ...@@ -32,9 +32,9 @@ class Transaction(object):
self.queue = SimpleQueue() self.queue = SimpleQueue()
self.txn = txn self.txn = txn
# data being stored # data being stored
self.data_dict = {} self.data_dict = {} # {oid: (value, [node_id])}
# data stored: this will go to the cache on tpc_finish # data stored: this will go to the cache on tpc_finish
self.cache_dict = {} self.cache_dict = {} # {oid: value}
# conflicts to resolve # conflicts to resolve
self.conflict_dict = {} # {oid: (base_serial, serial)} self.conflict_dict = {} # {oid: (base_serial, serial)}
# resolved conflicts # resolved conflicts
...@@ -72,6 +72,51 @@ class Transaction(object): ...@@ -72,6 +72,51 @@ class Transaction(object):
raise NEOStorageError( raise NEOStorageError(
'no storage available for write to partition %s' % object_id) 'no storage available for write to partition %s' % object_id)
def written(self, app, uuid, oid):
# When a node that is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the
# conflict. Because we have no way to identify such case, we must keep
# the data in self.data_dict until all nodes have answered so we remain
# able to resolve conflicts.
try:
data, uuid_list = self.data_dict[oid]
uuid_list.remove(uuid)
except KeyError:
# 1. store to S1 and S2
# 2. S2 reports a conflict
# 3. store to S1 and S2 # conflict resolution
# 4. S1 does not report a conflict (lockless)
# 5. S2 answers before S1 for the second store
return
except ValueError:
# The most common case for this exception is because nodeLost()
# tries all oids blindly. Other possible cases:
# - like above (KeyError), but with S2 answering last
# - answer to resolved conflict before the first answer from a
# node that was being disconnected by the master
return
if uuid_list:
return
del self.data_dict[oid]
if type(data) is str:
size = len(data)
self.data_size -= size
size += self.cache_size
if size < app._cache._max_size:
self.cache_size = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
self.cache_dict[oid] = data
def nodeLost(self, app, uuid):
self.involved_nodes[uuid] = 2
for oid in list(self.data_dict):
self.written(app, uuid, oid)
class TransactionContainer(dict): class TransactionContainer(dict):
# IDEA: Drop this container and use the new set_data/data API on # IDEA: Drop this container and use the new set_data/data API on
......
...@@ -955,8 +955,6 @@ class StoreObject(Packet): ...@@ -955,8 +955,6 @@ class StoreObject(Packet):
transaction ID, and data. C -> S. transaction ID, and data. C -> S.
As for IStorage, 'serial' is ZERO_TID for new objects. As for IStorage, 'serial' is ZERO_TID for new objects.
Answered 'conflict' value means: Answered 'conflict' value means:
- None: lockless
- serial: ok
- MAX_TID: deadlock - MAX_TID: deadlock
- else: conflict - else: conflict
""" """
......
...@@ -69,7 +69,7 @@ class ClientOperationHandler(EventHandler): ...@@ -69,7 +69,7 @@ class ClientOperationHandler(EventHandler):
def _askStoreObject(self, conn, oid, serial, compression, checksum, data, def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, request_time): data_serial, ttid, request_time):
try: try:
locked = self.app.tm.storeObject(ttid, serial, oid, compression, self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial) checksum, data, data_serial)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
...@@ -90,7 +90,7 @@ class ClientOperationHandler(EventHandler): ...@@ -90,7 +90,7 @@ class ClientOperationHandler(EventHandler):
duration = time.time() - request_time duration = time.time() - request_time
if duration > SLOW_STORE: if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration) logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(locked)) conn.answer(Packets.AnswerStoreObject(None))
def askStoreObject(self, conn, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid): compression, checksum, data, data_serial, ttid):
...@@ -168,7 +168,7 @@ class ClientOperationHandler(EventHandler): ...@@ -168,7 +168,7 @@ class ClientOperationHandler(EventHandler):
def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time): def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
try: try:
locked = self.app.tm.checkCurrentSerial(ttid, serial, oid) self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid)) conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
...@@ -188,7 +188,7 @@ class ClientOperationHandler(EventHandler): ...@@ -188,7 +188,7 @@ class ClientOperationHandler(EventHandler):
duration = time.time() - request_time duration = time.time() - request_time
if duration > SLOW_STORE: if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration) logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(locked)) conn.answer(Packets.AnswerCheckCurrentSerial(None))
# like ClientOperationHandler but read-only & only for tid <= backup_tid # like ClientOperationHandler but read-only & only for tid <= backup_tid
......
...@@ -266,37 +266,38 @@ class TransactionManager(EventQueue): ...@@ -266,37 +266,38 @@ class TransactionManager(EventQueue):
# undo target. # undo target.
previous_serial = transaction.store_dict[oid][2] previous_serial = transaction.store_dict[oid][2]
if previous_serial is None: if previous_serial is None:
# XXX: use some special serial when previous store was not # The only valid case is when the previous undo resulted in a
# an undo ? Maybe it should just not happen. # resolved conflict.
# Otherwise, this should not happen. For example, when being
# disconnected by the master because we missed a transaction,
# a conflict may happen after a first store to us, but the
# resolution waits for invalidations from the master (to then
# load the saved data), which are sent after the notification
# we are down, and the client would stop writing to us.
logging.info('Transaction %s storing %s more than once', logging.info('Transaction %s storing %s more than once',
dump(ttid), dump(oid)) dump(ttid), dump(oid))
return
else: else:
previous_serial = None
# XXX: Consider locking before reporting a conflict:
# - That would speed up the case of cascading conflict resolution
# by avoiding incremental resolution, assuming that the time to
# resolve a conflict is often constant: "C+A vs. B -> C+A+B"
# rarely costs more than "C+A vs. C+B -> C+A+B".
# - That would slow down of cascading unresolvable conflicts but
# if that happens, the application should be reviewed.
if previous_serial is None:
previous_serial = self._app.dm.getLastObjectTID(oid) previous_serial = self._app.dm.getLastObjectTID(oid)
# Locking before reporting a conflict would speed up the case of
# cascading conflict resolution by avoiding incremental resolution,
# assuming that the time to resolve a conflict is often constant:
# "C+A vs. B -> C+A+B" rarely costs more than "C+A vs. C+B -> C+A+B".
# However, this would be against the optimistic principle of ZODB.
if previous_serial is not None and previous_serial != serial: if previous_serial is not None and previous_serial != serial:
logging.info('Resolvable conflict on %r:%r', logging.info('Resolvable conflict on %r:%r',
dump(oid), dump(ttid)) dump(oid), dump(ttid))
raise ConflictError(previous_serial) raise ConflictError(previous_serial)
logging.debug('Transaction %s storing %s', dump(ttid), dump(oid)) logging.debug('Transaction %s storing %s', dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid self._store_lock_dict[oid] = ttid
return serial
def checkCurrentSerial(self, ttid, serial, oid): def checkCurrentSerial(self, ttid, serial, oid):
try: try:
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
locked = self.lockObject(ttid, serial, oid) self.lockObject(ttid, serial, oid)
transaction.check(oid) transaction.check(oid)
return locked
def storeObject(self, ttid, serial, oid, compression, checksum, data, def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial): value_serial):
...@@ -307,14 +308,13 @@ class TransactionManager(EventQueue): ...@@ -307,14 +308,13 @@ class TransactionManager(EventQueue):
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
locked = self.lockObject(ttid, serial, oid) self.lockObject(ttid, serial, oid)
# store object # store object
if data is None: if data is None:
data_id = None data_id = None
else: else:
data_id = self._app.dm.holdData(checksum, data, compression) data_id = self._app.dm.holdData(checksum, data, compression)
transaction.store(oid, data_id, value_serial) transaction.store(oid, data_id, value_serial)
return locked
def abort(self, ttid, even_if_locked=False): def abort(self, ttid, even_if_locked=False):
""" """
......
...@@ -541,7 +541,8 @@ class ConnectionFilter(object): ...@@ -541,7 +541,8 @@ class ConnectionFilter(object):
def remove(self, *filters): def remove(self, *filters):
with self.lock: with self.lock:
for filter in filters: for filter in filters:
del self.filter_dict[filter] for p in self.filter_dict.pop(filter):
p.revert()
self._retry() self._retry()
def discard(self, *filters): def discard(self, *filters):
...@@ -997,6 +998,11 @@ class NEOThreadedTest(NeoTestBase): ...@@ -997,6 +998,11 @@ class NEOThreadedTest(NeoTestBase):
self.assertEqual(stats, '|'.join(pt._formatRows(sorted( self.assertEqual(stats, '|'.join(pt._formatRows(sorted(
pt.count_dict, key=lambda x: index(x.getUUID()))))) pt.count_dict, key=lambda x: index(x.getUUID())))))
@staticmethod
def noConnection(jar, storage):
return Patch(jar.db().storage.app.cp, getConnForNode=lambda orig, node:
None if node.getUUID() == storage.uuid else orig(node))
def predictable_random(seed=None): def predictable_random(seed=None):
# Because we have 2 running threads when client works, we can't # Because we have 2 running threads when client works, we can't
......
...@@ -30,13 +30,15 @@ from neo.storage.transactions import TransactionManager, \ ...@@ -30,13 +30,15 @@ from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError DelayedError, ConflictError
from neo.lib.connection import ServerConnection, MTClientConnection from neo.lib.connection import ServerConnection, MTClientConnection
from neo.lib.exception import DatabaseFailure, StoppedOperation from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib import logging
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID uuid_str, ZERO_OID, ZERO_TID
from .. import expectedFailure, Patch, TransactionalResource from .. import expectedFailure, Patch, TransactionalResource
from . import ConnectionFilter, LockLock, NEOThreadedTest, with_cluster from . import ConnectionFilter, LockLock, NEOThreadedTest, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
from neo.client.transactions import Transaction
from neo.master.handlers.client import ClientServiceHandler from neo.master.handlers.client import ClientServiceHandler
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.identification import IdentificationHandler from neo.storage.handlers.identification import IdentificationHandler
...@@ -1352,7 +1354,7 @@ class Test(NEOThreadedTest): ...@@ -1352,7 +1354,7 @@ class Test(NEOThreadedTest):
another node. another node.
""" """
def answerStoreObject(orig, conn, conflict, oid, serial): def answerStoreObject(orig, conn, conflict, oid, serial):
if conflict == serial: if not conflict:
p.revert() p.revert()
ll() ll()
orig(conn, conflict, oid, serial) orig(conn, conflict, oid, serial)
...@@ -1480,10 +1482,6 @@ class Test(NEOThreadedTest): ...@@ -1480,10 +1482,6 @@ class Test(NEOThreadedTest):
""" """
def delayAbort(conn, packet): def delayAbort(conn, packet):
return isinstance(packet, Packets.AbortTransaction) return isinstance(packet, Packets.AbortTransaction)
def noConnection(jar, storage):
return Patch(jar.db().storage.app.cp,
getConnForNode=lambda orig, node:
None if node.getUUID() == storage.uuid else orig(node))
def c1_vote(txn): def c1_vote(txn):
def vote(orig, *args): def vote(orig, *args):
result = orig(*args) result = orig(*args)
...@@ -1504,7 +1502,7 @@ class Test(NEOThreadedTest): ...@@ -1504,7 +1502,7 @@ class Test(NEOThreadedTest):
t1, c1 = cluster.getTransaction() t1, c1 = cluster.getTransaction()
with cluster.newClient(1) as db: with cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db) t2, c2 = cluster.getTransaction(db)
with noConnection(c1, s2), noConnection(c2, s1): with self.noConnection(c1, s2), self.noConnection(c2, s1):
cluster.client.cp.connection_dict[s2.uuid].close() cluster.client.cp.connection_dict[s2.uuid].close()
self.tic() self.tic()
for c1_aborts in 0, 1: for c1_aborts in 0, 1:
...@@ -1555,5 +1553,59 @@ class Test(NEOThreadedTest): ...@@ -1555,5 +1553,59 @@ class Test(NEOThreadedTest):
self.tic() self.tic()
self.assertPartitionTable(cluster, pt) self.assertPartitionTable(cluster, pt)
@with_cluster(replicas=1)
def testPartialConflict(self, cluster):
"""
This scenario proves that the client must keep the data of a modified
oid until it is successfully stored to all storages. Indeed, if a
concurrent transaction fails to commit to all storage nodes, we must
handle inconsistent results from replicas.
C1 S1 S2 C2
no connection between S1 and C2
store ---> locked <------ commit
`--------------> conflict
"""
def begin1(*_):
t2.commit()
f.add(delayAnswerStoreObject, Patch(Transaction, written=written))
def delayAnswerStoreObject(conn, packet):
return (isinstance(packet, Packets.AnswerStoreObject)
and getattr(conn.getHandler(), 'app', None) is s)
def written(orig, *args):
orig(*args)
f.remove(delayAnswerStoreObject)
def sync(orig):
mc1.remove(delayMaster)
orig()
s1 = cluster.storage_list[0]
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounterWithResolution()
t1.commit()
with cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db)
with self.noConnection(c2, s1):
for s in cluster.storage_list:
logging.info("late answer from %s", uuid_str(s.uuid))
x.value += 1
c2.root()['x'].value += 2
TransactionalResource(t1, 1, tpc_begin=begin1)
s1m, = s1.getConnectionList(cluster.master)
try:
s1.em.removeReader(s1m)
with ConnectionFilter() as f, \
cluster.master.filterConnection(
cluster.client) as mc1:
f.delayAskFetchTransactions()
delayMaster = mc1.delayNotifyNodeInformation(
Patch(cluster.client, sync=sync))
t1.commit()
self.assertPartitionTable(cluster, 'OU')
finally:
s1.em.addReader(s1m)
self.tic()
self.assertPartitionTable(cluster, 'UU')
self.assertEqual(x.value, 6)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment