Commit dd74d662 authored by Julien Muchembled's avatar Julien Muchembled

Recover from failures during tpc_finish when the transaction got successfully committed

parent 7ee7ff4e
...@@ -92,9 +92,6 @@ ...@@ -92,9 +92,6 @@
be split in chunks and processed in "background" on storage nodes. be split in chunks and processed in "background" on storage nodes.
Packing throttling should probably be at the lowest possible priority Packing throttling should probably be at the lowest possible priority
(below interactive use and below replication). (below interactive use and below replication).
- tpc_finish failures propagation to master (FUNCTIONALITY)
When asked to lock transaction data, if something goes wrong the master
node must be informed.
- Verify data checksum on reception (FUNCTIONALITY) - Verify data checksum on reception (FUNCTIONALITY)
In current implementation, client generates a checksum before storing, In current implementation, client generates a checksum before storing,
which is only checked upon load. This doesn't prevent from storing which is only checked upon load. This doesn't prevent from storing
...@@ -122,9 +119,6 @@ ...@@ -122,9 +119,6 @@
and truncate the DB. and truncate the DB.
- Optimize operational status check by recording which rows are ready - Optimize operational status check by recording which rows are ready
instead of parsing the whole partition table. (SPEED) instead of parsing the whole partition table. (SPEED)
- tpc_finish failures propagation to client (FUNCTIONALITY)
When a storage node notifies a problem during lock/unlock phase, an error
must be propagated to client.
Client Client
- Merge Application into Storage (SPEED) - Merge Application into Storage (SPEED)
......
...@@ -29,7 +29,7 @@ from persistent.TimeStamp import TimeStamp ...@@ -29,7 +29,7 @@ from persistent.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import NodeTypes, Packets, \ from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, ZERO_HASH, ZERO_TID INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
from neo.lib.event import EventManager from neo.lib.event import EventManager
from neo.lib.util import makeChecksum, dump from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Empty, Lock, SimpleQueue from neo.lib.locking import Empty, Lock, SimpleQueue
...@@ -690,16 +690,6 @@ class Application(ThreadedApplication): ...@@ -690,16 +690,6 @@ class Application(ThreadedApplication):
a final place so that the new transaction is readable, but this is a final place so that the new transaction is readable, but this is
something that can always be replayed (during the verification phase) something that can always be replayed (during the verification phase)
if any failure happens. if any failure happens.
TODO: We should recover from master failures when the transaction got
successfully committed. More precisely, we should not raise:
- if any failure happens after all storage nodes have processed
successfully the LockInformation packets from the master;
- and if we can reconnect to the cluster to check that the ttid
got successfuly committed, which is possible because storage
nodes remember the ttid of all transactions.
See neo.threaded.test.Test.testStorageFailureDuringTpcFinish
This bug exists in ZEO.
""" """
txn_container = self._txn_container txn_container = self._txn_container
if 'voted' not in txn_container.get(transaction): if 'voted' not in txn_container.get(transaction):
...@@ -714,14 +704,43 @@ class Application(ThreadedApplication): ...@@ -714,14 +704,43 @@ class Application(ThreadedApplication):
if data is CHECKED_SERIAL] if data is CHECKED_SERIAL]
for oid in checked_list: for oid in checked_list:
del cache_dict[oid] del cache_dict[oid]
tid = self._askPrimary(Packets.AskFinishTransaction( ttid = txn_context['ttid']
txn_context['ttid'], cache_dict, checked_list), p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
cache_dict=cache_dict, callback=f) try:
assert tid tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid
except ConnectionClosed:
tid = self._getFinalTID(ttid)
if not tid:
raise
return tid return tid
finally: finally:
self._load_lock_release() self._load_lock_release()
def _getFinalTID(self, ttid):
try:
p = Packets.AskFinalTID(ttid)
while 1:
try:
tid = self._askPrimary(p)
break
except ConnectionClosed:
pass
if tid == MAX_TID:
while 1:
for _, conn in self.cp.iterateForObject(
ttid, readable=True):
try:
return self._askStorage(conn, p)
except ConnectionClosed:
pass
self._getMasterConnection()
elif tid:
return tid
except Exception:
logging.exception("Failed to get final tid for TXN %s",
dump(ttid))
def undo(self, undone_tid, txn, tryToResolveConflict): def undo(self, undone_tid, txn, tryToResolveConflict):
txn_context = self._txn_container.get(txn) txn_context = self._txn_container.get(txn)
txn_info, txn_ext = self._getTransactionInformation(undone_tid) txn_info, txn_ext = self._getTransactionInformation(undone_tid)
......
...@@ -202,3 +202,6 @@ class PrimaryAnswersHandler(AnswerBaseHandler): ...@@ -202,3 +202,6 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
def answerLastTransaction(self, conn, ltid): def answerLastTransaction(self, conn, ltid):
pass pass
def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid)
...@@ -156,6 +156,9 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -156,6 +156,9 @@ class StorageAnswersHandler(AnswerBaseHandler):
undo_object_tid_dict): undo_object_tid_dict):
undo_object_tid_dict.update(object_tid_dict) undo_object_tid_dict.update(object_tid_dict)
def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid)
def answerHasLock(self, conn, oid, status): def answerHasLock(self, conn, oid, status):
store_msg_id = self.app.getHandlerData()['timeout_dict'].pop(oid) store_msg_id = self.app.getHandlerData()['timeout_dict'].pop(oid)
if status == LockState.GRANTED_TO_OTHER: if status == LockState.GRANTED_TO_OTHER:
......
...@@ -20,7 +20,7 @@ import traceback ...@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO from cStringIO import StringIO
from struct import Struct from struct import Struct
PROTOCOL_VERSION = 5 PROTOCOL_VERSION = 6
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -820,7 +820,7 @@ class LockedTransactions(Packet): ...@@ -820,7 +820,7 @@ class LockedTransactions(Packet):
class FinalTID(Packet): class FinalTID(Packet):
""" """
Return final tid if ttid has been committed. * -> S. Return final tid if ttid has been committed. * -> S. C -> PM.
""" """
_fmt = PStruct('final_tid', _fmt = PStruct('final_tid',
PTID('ttid'), PTID('ttid'),
......
...@@ -519,11 +519,11 @@ class Application(BaseApplication): ...@@ -519,11 +519,11 @@ class Application(BaseApplication):
tid = txn.getTID() tid = txn.getTID()
transaction_node = txn.getNode() transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
for client_node in self.nm.getClientList(only_identified=True): for client_node in self.nm.getClientList(only_identified=True):
c = client_node.getConnection() c = client_node.getConnection()
if client_node is transaction_node: if client_node is transaction_node:
c.answer(transaction_finished, msg_id=txn.getMessageId()) c.answer(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId())
else: else:
c.notify(invalidate_objects) c.notify(invalidate_objects)
...@@ -533,12 +533,16 @@ class Application(BaseApplication): ...@@ -533,12 +533,16 @@ class Application(BaseApplication):
for storage_uuid in txn.getUUIDList(): for storage_uuid in txn.getUUIDList():
getByUUID(storage_uuid).getConnection().notify(notify_unlock) getByUUID(storage_uuid).getConnection().notify(notify_unlock)
# Notify storage that have replications blocked by this transaction # Notify storage that have replications blocked by this transaction,
# and clients that try to recover from a failure during tpc_finish.
notify_finished = Packets.NotifyTransactionFinished(ttid, tid) notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
for storage_uuid in txn.getNotificationUUIDList(): for uuid in txn.getNotificationUUIDList():
node = getByUUID(storage_uuid) node = getByUUID(uuid)
if node is not None and node.isConnected(): if node.isClient():
node.getConnection().notify(notify_finished) # There should be only 1 client interested.
node.answer(Packets.AnswerFinalTID(tid))
else:
node.notify(notify_finished)
assert self.last_transaction < tid, (self.last_transaction, tid) assert self.last_transaction < tid, (self.last_transaction, tid)
self.setLastTransaction(tid) self.setLastTransaction(tid)
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.protocol import NodeStates, Packets, ProtocolError from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID
from . import MasterHandler from . import MasterHandler
class ClientServiceHandler(MasterHandler): class ClientServiceHandler(MasterHandler):
...@@ -87,6 +87,21 @@ class ClientServiceHandler(MasterHandler): ...@@ -87,6 +87,21 @@ class ClientServiceHandler(MasterHandler):
for node in identified_node_list: for node in identified_node_list:
node.ask(p, timeout=60) node.ask(p, timeout=60)
def askFinalTID(self, conn, ttid):
tm = self.app.tm
if tm.getLastTID() < ttid:
# Invalid ttid, or aborted transaction.
tid = None
elif ttid in tm:
# Transaction is being finished.
# We'll answer when it is unlocked.
tm[ttid].registerForNotification(conn.getUUID())
return
else:
# Transaction committed ? Tell client to ask storages.
tid = MAX_TID
conn.answer(Packets.AnswerFinalTID(tid))
def askPack(self, conn, tid): def askPack(self, conn, tid):
app = self.app app = self.app
if app.packing is None: if app.packing is None:
...@@ -100,5 +115,6 @@ class ClientServiceHandler(MasterHandler): ...@@ -100,5 +115,6 @@ class ClientServiceHandler(MasterHandler):
conn.answer(Packets.AnswerPack(False)) conn.answer(Packets.AnswerPack(False))
def abortTransaction(self, conn, tid): def abortTransaction(self, conn, tid):
# BUG: The replicator may wait this transaction to be finished.
self.app.tm.abort(tid, conn.getUUID()) self.app.tm.abort(tid, conn.getUUID())
...@@ -102,13 +102,13 @@ class Transaction(object): ...@@ -102,13 +102,13 @@ class Transaction(object):
def registerForNotification(self, uuid): def registerForNotification(self, uuid):
""" """
Register a storage node that requires a notification at commit Register a node that requires a notification at commit
""" """
self._notification_set.add(uuid) self._notification_set.add(uuid)
def getNotificationUUIDList(self): def getNotificationUUIDList(self):
""" """
Returns the list of storage waiting for the transaction to be Returns the list of nodes waiting for the transaction to be
finished finished
""" """
return list(self._notification_set) return list(self._notification_set)
...@@ -128,6 +128,7 @@ class Transaction(object): ...@@ -128,6 +128,7 @@ class Transaction(object):
for it. for it.
Does nothing if the node was not part of the transaction. Does nothing if the node was not part of the transaction.
""" """
self._notification_set.discard(uuid)
# XXX: We might lose information that a storage successfully locked # XXX: We might lose information that a storage successfully locked
# data but was later found to be disconnected. This loss has no impact # data but was later found to be disconnected. This loss has no impact
# on current code, but it might be disturbing to reader or future code. # on current code, but it might be disturbing to reader or future code.
...@@ -143,6 +144,8 @@ class Transaction(object): ...@@ -143,6 +144,8 @@ class Transaction(object):
self._node = None # orphan self._node = None # orphan
else: else:
return True # abort return True # abort
else:
self._notification_set.discard(uuid)
return False return False
def lock(self, uuid): def lock(self, uuid):
......
...@@ -145,6 +145,9 @@ class ClientOperationHandler(EventHandler): ...@@ -145,6 +145,9 @@ class ClientOperationHandler(EventHandler):
tid_list = app.dm.getTIDList(first, last - first, partition_list) tid_list = app.dm.getTIDList(first, last - first, partition_list)
conn.answer(Packets.AnswerTIDs(tid_list)) conn.answer(Packets.AnswerTIDs(tid_list))
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid)))
def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list): def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
app = self.app app = self.app
findUndoTID = app.dm.findUndoTID findUndoTID = app.dm.findUndoTID
......
...@@ -207,6 +207,12 @@ class TransactionManager(object): ...@@ -207,6 +207,12 @@ class TransactionManager(object):
self._app.em.setTimeout(time() + 1, dm.deferCommit()) self._app.em.setTimeout(time() + 1, dm.deferCommit())
self.abort(ttid, even_if_locked=True) self.abort(ttid, even_if_locked=True)
def getFinalTID(self, ttid):
try:
return self._transaction_dict[ttid].getTID()
except KeyError:
return self._app.dm.getFinalTID(ttid)
def getLockingTID(self, oid): def getLockingTID(self, oid):
return self._store_lock_dict.get(oid) return self._store_lock_dict.get(oid)
......
...@@ -32,7 +32,7 @@ import neo.client.app, neo.neoctl.app ...@@ -32,7 +32,7 @@ import neo.client.app, neo.neoctl.app
from neo.client import Storage from neo.client import Storage
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import BaseConnection, \ from neo.lib.connection import BaseConnection, \
ClientConnection, Connection, ListeningConnection ClientConnection, Connection, ConnectionClosed, ListeningConnection
from neo.lib.connector import SocketConnector, ConnectorException from neo.lib.connector import SocketConnector, ConnectorException
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.locking import SimpleQueue from neo.lib.locking import SimpleQueue
...@@ -910,6 +910,10 @@ class NEOThreadedTest(NeoTestBase): ...@@ -910,6 +910,10 @@ class NEOThreadedTest(NeoTestBase):
del self.__exc_info del self.__exc_info
raise etype, value, tb raise etype, value, tb
def commitWithStorageFailure(self, client, txn):
with Patch(client, _getFinalTID=lambda *_: None):
self.assertRaises(ConnectionClosed, txn.commit)
def predictable_random(seed=None): def predictable_random(seed=None):
# Because we have 2 running threads when client works, we can't # Because we have 2 running threads when client works, we can't
......
...@@ -25,15 +25,16 @@ from persistent import Persistent, GHOST ...@@ -25,15 +25,16 @@ from persistent import Persistent, GHOST
from ZODB import DB, POSException from ZODB import DB, POSException
from neo.storage.transactions import TransactionManager, \ from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError DelayedError, ConflictError
from neo.lib.connection import ConnectionClosed, MTClientConnection from neo.lib.connection import MTClientConnection
from neo.lib.exception import DatabaseFailure, StoppedOperation from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_TID ZERO_TID
from .. import expectedFailure, _ExpectedFailure, _UnexpectedSuccess, Patch from .. import expectedFailure, Patch
from . import LockLock, NEOCluster, NEOThreadedTest from . import LockLock, NEOCluster, NEOThreadedTest
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
from neo.master.handlers.client import ClientServiceHandler
from neo.storage.handlers.initialization import InitializationHandler from neo.storage.handlers.initialization import InitializationHandler
class PCounter(Persistent): class PCounter(Persistent):
...@@ -498,7 +499,7 @@ class Test(NEOThreadedTest): ...@@ -498,7 +499,7 @@ class Test(NEOThreadedTest):
r[0] = PCounter() r[0] = PCounter()
tids = [r._p_serial] tids = [r._p_serial]
with onLockTransaction(s0), onLockTransaction(s1): with onLockTransaction(s0), onLockTransaction(s1):
self.assertRaises(ConnectionClosed, t.commit) t.commit()
self.assertEqual(r._p_state, GHOST) self.assertEqual(r._p_state, GHOST)
self.tic() self.tic()
t.begin() t.begin()
...@@ -510,7 +511,7 @@ class Test(NEOThreadedTest): ...@@ -510,7 +511,7 @@ class Test(NEOThreadedTest):
c.readCurrent(x) c.readCurrent(x)
with cluster.moduloTID(1): with cluster.moduloTID(1):
with onLockTransaction(s0), onLockTransaction(s1): with onLockTransaction(s0), onLockTransaction(s1):
self.assertRaises(ConnectionClosed, t.commit) t.commit()
self.tic() self.tic()
t.begin() t.begin()
# The following line checks that s1 moved the transaction # The following line checks that s1 moved the transaction
...@@ -531,7 +532,7 @@ class Test(NEOThreadedTest): ...@@ -531,7 +532,7 @@ class Test(NEOThreadedTest):
y.value = 2 y.value = 2
di0 = s0.getDataLockInfo() di0 = s0.getDataLockInfo()
with onLockTransaction(s1, die=True): with onLockTransaction(s1, die=True):
self.assertRaises(ConnectionClosed, t.commit) self.commitWithStorageFailure(cluster.client, t)
finally: finally:
cluster.stop() cluster.stop()
cluster.reset() cluster.reset()
...@@ -571,7 +572,7 @@ class Test(NEOThreadedTest): ...@@ -571,7 +572,7 @@ class Test(NEOThreadedTest):
c.root()[0] = None c.root()[0] = None
s0, s1 = cluster.storage_list s0, s1 = cluster.storage_list
with onLockTransaction(s0, False), onLockTransaction(s1, True): with onLockTransaction(s0, False), onLockTransaction(s1, True):
self.assertRaises(ConnectionClosed, t.commit) self.commitWithStorageFailure(cluster.client, t)
s0.resetNode() s0.resetNode()
s0.start() s0.start()
t.begin() t.begin()
...@@ -635,7 +636,7 @@ class Test(NEOThreadedTest): ...@@ -635,7 +636,7 @@ class Test(NEOThreadedTest):
storage.dm.setConfiguration("version", None) storage.dm.setConfiguration("version", None)
c.root()._p_changed = 1 c.root()._p_changed = 1
with Patch(storage.tm, lock=lambda *_: sys.exit()): with Patch(storage.tm, lock=lambda *_: sys.exit()):
self.assertRaises(ConnectionClosed, t.commit) self.commitWithStorageFailure(cluster.client, t)
self.assertRaises(DatabaseFailure, storage.resetNode) self.assertRaises(DatabaseFailure, storage.resetNode)
finally: finally:
cluster.stop() cluster.stop()
...@@ -1008,23 +1009,64 @@ class Test(NEOThreadedTest): ...@@ -1008,23 +1009,64 @@ class Test(NEOThreadedTest):
c.root()['x'] = PCounter() c.root()['x'] = PCounter()
with cluster.master.filterConnection(cluster.client) as m2c: with cluster.master.filterConnection(cluster.client) as m2c:
m2c.add(answerTransactionFinished) m2c.add(answerTransactionFinished)
# XXX: This is an expected failure. A ttid column was added to # After a storage failure during tpc_finish, the client
# 'trans' table to permit recovery, by checking that the # reconnects and checks that the transaction was really
# transaction was really committed. # committed.
try: t.commit()
t.commit()
raise _UnexpectedSuccess
except ConnectionClosed, e:
e = type(e), None, None
# Also check that the master reset the last oid to a correct value. # Also check that the master reset the last oid to a correct value.
self.assertTrue(cluster.client.new_oid_list)
t.begin() t.begin()
self.assertEqual(1, u64(c.root()['x']._p_oid)) self.assertEqual(1, u64(c.root()['x']._p_oid))
self.assertFalse(cluster.client.new_oid_list) self.assertFalse(cluster.client.new_oid_list)
self.assertEqual(2, u64(cluster.client.new_oid())) self.assertEqual(2, u64(cluster.client.new_oid()))
finally: finally:
cluster.stop() cluster.stop()
raise _ExpectedFailure(e)
def testClientFailureDuringTpcFinish(self):
def delayAskLockInformation(conn, packet):
if isinstance(packet, Packets.AskLockInformation):
cluster.client.master_conn.close()
return True
def askFinalTID(orig, *args):
m2s.remove(delayAskLockInformation)
orig(*args)
def _getFinalTID(orig, ttid):
m2s.remove(delayAskLockInformation)
self.tic()
return orig(ttid)
cluster = NEOCluster()
try:
cluster.start()
t, c = cluster.getTransaction()
r = c.root()
r['x'] = PCounter()
tid0 = r._p_serial
with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayAskLockInformation,
Patch(ClientServiceHandler, askFinalTID=askFinalTID))
t.commit() # the final TID is returned by the master
t.begin()
r['x'].value += 1
tid1 = r._p_serial
self.assertTrue(tid0 < tid1)
with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayAskLockInformation,
Patch(cluster.client, _getFinalTID=_getFinalTID))
t.commit() # the final TID is returned by the storage backend
t.begin()
r['x'].value += 1
tid2 = r['x']._p_serial
self.assertTrue(tid1 < tid2)
with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayAskLockInformation,
Patch(cluster.client, _getFinalTID=_getFinalTID))
m2s.add(lambda conn, packet:
isinstance(packet, Packets.NotifyUnlockInformation))
t.commit() # the final TID is returned by the storage (tm)
t.begin()
self.assertEqual(r['x'].value, 2)
self.assertTrue(tid2 < r['x']._p_serial)
finally:
cluster.stop()
def testEmptyTransaction(self): def testEmptyTransaction(self):
cluster = NEOCluster() cluster = NEOCluster()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment