Commit 661c955c authored by Vincent Pelletier's avatar Vincent Pelletier

Solve store deadlocks.

Store deadlocks might happen when the order in which storage nodes receive
object data (and hence, grant write lock for those objects) is not
consistent among clients. So when a store timeouts, it might be because
another transaction got the store lock on this storage and object, while
it might wait for locks we got on other storage nodes or objects.
In doubt, report a conflict to abort current transaction in hope for the
other transaction to succeed.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2109 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fc514154
......@@ -34,7 +34,7 @@ from neo.protocol import NodeTypes, Packets, INVALID_PARTITION
from neo.event import EventManager
from neo.util import makeChecksum as real_makeChecksum, dump
from neo.locking import Lock
from neo.connection import MTClientConnection
from neo.connection import MTClientConnection, OnTimeout
from neo.node import NodeManager
from neo.connector import getConnectorHandler
from neo.client.exception import NEOStorageError
......@@ -581,6 +581,7 @@ class Application(object):
checksum = makeChecksum(compressed_data)
p = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, self.local_var.tid)
on_timeout = OnTimeout(self.onStoreTimeout, oid)
# Store object in tmp cache
self.local_var.data_dict[oid] = data
# Store data on each node
......@@ -592,13 +593,21 @@ class Application(object):
if conn is None:
continue
try:
conn.ask(p)
conn.ask(p, on_timeout=on_timeout)
except ConnectionClosed:
continue
self._waitAnyMessage(False)
return None
def onStoreTimeout(self, conn, msg_id, oid):
# Ask the storage if someone locks the object.
# Shorten timeout to react earlier to an unresponding storage.
conn.ask(Packets.AskHasLock(oid), timeout=5)
# Stop expecting the timed-out store request.
self.dispatcher.forget(conn, msg_id)
return True
@profiler_decorator
def _handleConflicts(self, tryToResolveConflict):
result = []
......
......@@ -16,10 +16,11 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from ZODB.TimeStamp import TimeStamp
from ZODB.POSException import ConflictError
from neo import logging
from neo.client.handlers import BaseHandler, AnswerBaseHandler
from neo.protocol import NodeTypes, ProtocolError
from neo.protocol import NodeTypes, ProtocolError, LockState
from neo.util import dump
from neo.client.exception import NEOStorageError
......@@ -132,3 +133,20 @@ class StorageAnswersHandler(AnswerBaseHandler):
for oid in oid_list:
data_dict[oid] = ''
def answerHasLock(self, conn, oid, status):
if status == LockState.GRANTED_TO_OTHER:
# Object is locked by another transaction, and we have waited until
# timeout. To avoid a deadlock, abort current transaction (we might
# be locking objects the other transaction is waiting for).
raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
dump(oid), conn.getNode())
elif status == LockState.GRANTED:
logging.info('Store of oid %s was successful, but after timeout.',
dump(oid))
# XXX: Not sure what to do in this case yet, for now do nothing.
else:
# Nobody has the lock, although we asked storage to lock. This
# means there is a software bug somewhere.
# XXX: Not sure what to do in this case yet
raise NotImplementedError
......@@ -332,6 +332,12 @@ class EventHandler(object):
def answerUndoTransaction(self, conn, oid_list, error_oid_list, conflict_oid_list):
raise UnexpectedPacketError
def askHasLock(self, tid, oid):
raise UnexpectedPacketError
def answerHasLock(self, oid, status):
raise UnexpectedPacketError
# Error packet handlers.
def error(self, conn, code, message):
......@@ -437,6 +443,8 @@ class EventHandler(object):
d[Packets.NotifyReplicationDone] = self.notifyReplicationDone
d[Packets.AskUndoTransaction] = self.askUndoTransaction
d[Packets.AnswerUndoTransaction] = self.answerUndoTransaction
d[Packets.AskHasLock] = self.askHasLock
d[Packets.AnswerHasLock] = self.answerHasLock
return d
......
......@@ -77,6 +77,12 @@ class CellStates(Enum):
DISCARDED = Enum.Item(4)
CellStates = CellStates()
class LockState(Enum):
NOT_LOCKED = Enum.Item(1)
GRANTED = Enum.Item(2)
GRANTED_TO_OTHER = Enum.Item(3)
LockState = LockState()
# used for logging
node_state_prefix_dict = {
NodeStates.RUNNING: 'R',
......@@ -159,6 +165,13 @@ def _decodeErrorCode(original_error_code):
original_error_code)
return error_code
def _decodeLockState(original_lock_state):
lock_state = LockState.get(original_lock_state)
if lock_state is None:
raise PacketMalformedError('invalid lock state %d' % (
original_lock_state, ))
return lock_state
def _decodeAddress(address):
if address == '\0' * 6:
return None
......@@ -1547,6 +1560,30 @@ class AnswerUndoTransaction(Packet):
append(read(OID_LEN))
return (oid_list, error_oid_list, conflict_oid_list)
class AskHasLock(Packet):
"""
Ask a storage is oid is locked by another transaction.
C -> S
"""
def _encode(self, tid, oid):
return _encodeTID(tid) + _encodeTID(oid)
def _decode(self, body):
return (_decodeTID(body[:8]), _decodeTID(body[8:]))
class AnswerHasLock(Packet):
"""
Answer whether a transaction holds the write lock for requested object.
"""
_header_format = '!8sH'
def _encode(self, oid, state):
return pack(self._header_format, oid, state)
def _decode(self, body):
oid, state = unpack(self._header_format, body)
return (oid, _decodeLockState(state))
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
......@@ -1770,6 +1807,10 @@ class PacketRegistry(dict):
0x0033,
AskUndoTransaction,
AnswerUndoTransaction)
AskHasLock, AnswerHasLock = register(
0x0034,
AskHasLock,
AnswerHasLock)
# build a "singleton"
Packets = PacketRegistry()
......
......@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import protocol
from neo.protocol import Packets
from neo.protocol import Packets, LockState
from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.storage.transactions import ConflictError, DelayedError
import time
......@@ -124,3 +124,13 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
conn.answer(Packets.AnswerUndoTransaction(oid_list, error_oid_list,
conflict_oid_list))
def askHasLock(self, conn, tid, oid):
locking_tid = self.app.tm.getLockingTID(oid)
if locking_tid is None:
state = LockState.NOT_LOCKED
elif locking_tid is tid:
state = LockState.GRANTED
else:
state = LockState.GRANTED_TO_OTHER
conn.answer(Packets.AnswerHasLock(oid, state))
......@@ -18,10 +18,11 @@
import unittest
from mock import Mock
from neo.tests import NeoTestBase
from neo.protocol import NodeTypes
from neo.protocol import NodeTypes, LockState
from neo.client.handlers.storage import StorageBootstrapHandler, \
StorageAnswersHandler
from neo.client.exception import NEOStorageError
from ZODB.POSException import ConflictError
MARKER = []
......@@ -257,6 +258,17 @@ class StorageAnswerHandlerTests(NeoTestBase):
self.assertEqual(undo_error_oid_list, [oid_2])
self.assertEqual(data_dict, {oid_1: ''})
def test_answerHasLock(self):
uuid = self.getNewUUID()
conn = self.getFakeConnection(uuid=uuid)
oid = self.getOID(0)
self.assertRaises(ConflictError, self.handler.answerHasLock, conn, oid,
LockState.GRANTED_TO_OTHER)
# XXX: Just check that this doesn't raise for the moment.
self.handler.answerHasLock(conn, oid, LockState.GRANTED)
# TODO: Test LockState.NOT_LOCKED case when implemented.
if __name__ == '__main__':
unittest.main()
......@@ -24,7 +24,7 @@ from neo.storage.transactions import ConflictError, DelayedError
from neo.storage.handlers.client import ClientOperationHandler
from neo.protocol import INVALID_PARTITION
from neo.protocol import INVALID_TID, INVALID_OID, INVALID_SERIAL
from neo.protocol import Packets
from neo.protocol import Packets, LockState
class StorageClientHandlerTests(NeoTestBase):
......@@ -277,5 +277,24 @@ class StorageClientHandlerTests(NeoTestBase):
self.assertEqual(oid_list_2, [oid_2])
self.assertEqual(oid_list_3, [oid_3])
def test_askHasLock(self):
tid_1 = self.getNextTID()
tid_2 = self.getNextTID()
oid = self.getNextTID()
def getLockingTID(oid):
return locking_tid
self.app.tm.getLockingTID = getLockingTID
for locking_tid, status in (
(None, LockState.NOT_LOCKED),
(tid_1, LockState.GRANTED),
(tid_2, LockState.GRANTED_TO_OTHER),
):
conn = self._getConnection()
self.operation.askHasLock(conn, tid_1, oid)
p_oid, p_status = self.checkAnswerPacket(conn,
Packets.AnswerHasLock, decode=True)
self.assertEqual(oid, p_oid)
self.assertEqual(status, p_status)
if __name__ == "__main__":
unittest.main()
......@@ -17,7 +17,7 @@
import unittest
from neo.protocol import NodeTypes, NodeStates, CellStates, ClusterStates
from neo.protocol import ErrorCodes, Packets, Errors
from neo.protocol import ErrorCodes, Packets, Errors, LockState
from neo.protocol import INVALID_TID
from neo.tests import NeoTestBase
......@@ -589,6 +589,17 @@ class ProtocolTests(NeoTestBase):
p = Packets.AnswerPartitionList(ptid, row_list)
self.assertEqual(p.decode(), (ptid, row_list))
def test_AskHasLock(self):
tid = self.getNextTID()
oid = self.getNextTID()
p = Packets.AskHasLock(tid, oid)
self.assertEqual(p.decode(), (tid, oid))
def test_AnswerHasLock(self):
oid = self.getNextTID()
for lock_state in LockState.itervalues():
p = Packets.AnswerHasLock(oid, lock_state)
self.assertEqual(p.decode(), (oid, lock_state))
if __name__ == '__main__':
unittest.main()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment