Commit c184ab48 authored by Julien Muchembled's avatar Julien Muchembled

Remove HasLock mechanism

It's dead code, because 1 year after it was introduced, something else was
implemented to detect deadlocks immediately.

Anyway, it would be an unacceptable way to detect them.
parent 787586e6
...@@ -19,7 +19,6 @@ from zlib import compress, decompress ...@@ -19,7 +19,6 @@ from zlib import compress, decompress
from random import shuffle from random import shuffle
import heapq import heapq
import time import time
from functools import partial
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError from ZODB.POSException import ReadConflictError
...@@ -443,11 +442,6 @@ class Application(ThreadedApplication): ...@@ -443,11 +442,6 @@ class Application(ThreadedApplication):
compressed_data = data compressed_data = data
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
txn_context['data_size'] += size txn_context['data_size'] += size
on_timeout = partial(
self.onStoreTimeout,
txn_context=txn_context,
oid=oid,
)
# Store object in tmp cache # Store object in tmp cache
txn_context['data_dict'][oid] = data txn_context['data_dict'][oid] = data
# Store data on each node # Store data on each node
...@@ -460,8 +454,7 @@ class Application(ThreadedApplication): ...@@ -460,8 +454,7 @@ class Application(ThreadedApplication):
checksum, compressed_data, data_serial, ttid, unlock) checksum, compressed_data, data_serial, ttid, unlock)
for node, conn in self.cp.iterateForObject(oid): for node, conn in self.cp.iterateForObject(oid):
try: try:
conn.ask(packet, on_timeout=on_timeout, queue=queue, conn.ask(packet, queue=queue, oid=oid, serial=serial)
oid=oid, serial=serial)
add_involved_nodes(node) add_involved_nodes(node)
except ConnectionClosed: except ConnectionClosed:
continue continue
...@@ -472,16 +465,6 @@ class Application(ThreadedApplication): ...@@ -472,16 +465,6 @@ class Application(ThreadedApplication):
self._waitAnyTransactionMessage(txn_context) self._waitAnyTransactionMessage(txn_context)
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
def onStoreTimeout(self, conn, msg_id, txn_context, oid):
# NOTE: this method is called from poll thread, don't use
# thread-specific value !
txn_context.setdefault('timeout_dict', {})[oid] = msg_id
# Ask the storage if someone locks the object.
# By sending a message with a smaller timeout,
# the connection will be kept open.
conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
timeout=5, queue=txn_context['queue'])
def _handleConflicts(self, txn_context, tryToResolveConflict): def _handleConflicts(self, txn_context, tryToResolveConflict):
result = [] result = []
append = result.append append = result.append
......
...@@ -15,10 +15,9 @@ ...@@ -15,10 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.POSException import ConflictError
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import LockState, ZERO_TID from neo.lib.protocol import ZERO_TID
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.exception import NodeNotReady from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler from neo.lib.handler import MTEventHandler
...@@ -158,33 +157,6 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -158,33 +157,6 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerFinalTID(self, conn, tid): def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid) self.app.setHandlerData(tid)
def answerHasLock(self, conn, oid, status):
store_msg_id = self.app.getHandlerData()['timeout_dict'].pop(oid)
if status == LockState.GRANTED_TO_OTHER:
# Stop expecting the timed-out store request.
self.app.dispatcher.forget(conn, store_msg_id)
# Object is locked by another transaction, and we have waited until
# timeout. To avoid a deadlock, abort current transaction (we might
# be locking objects the other transaction is waiting for).
raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
dump(oid), conn)
# HasLock design required that storage is multi-threaded so that
# it can answer to AskHasLock while processing store requests.
# This means that the 2 cases (granted to us or nobody) are legitimate,
# either because it gave us the lock but is/was slow to store our data,
# or because the storage took a lot of time processing a previous
# store (and did not even considered our lock request).
# XXX: But storage nodes are still mono-threaded, so they should
# only answer with GRANTED_TO_OTHER (if they reply!), except
# maybe in very rare cases of race condition. Only log for now.
# This also means that most of the time, if the storage is slow
# to process some store requests, HasLock will timeout in turn
# and the connector will be closed.
# Anyway, it's not clear that HasLock requests are useful.
# Are store requests potentially long to process ? If not,
# we should simply raise a ConflictError on store timeout.
logging.info('Store of oid %s delayed (storage overload ?)', dump(oid))
def alreadyPendingError(self, conn, message): def alreadyPendingError(self, conn, message):
pass pass
...@@ -146,12 +146,6 @@ def CellStates(): ...@@ -146,12 +146,6 @@ def CellStates():
# readable nor writable. # readable nor writable.
CORRUPTED CORRUPTED
@Enum
def LockState():
NOT_LOCKED
GRANTED
GRANTED_TO_OTHER
# used for logging # used for logging
node_state_prefix_dict = { node_state_prefix_dict = {
NodeStates.RUNNING: 'R', NodeStates.RUNNING: 'R',
...@@ -1241,22 +1235,6 @@ class ObjectUndoSerial(Packet): ...@@ -1241,22 +1235,6 @@ class ObjectUndoSerial(Packet):
), ),
) )
class HasLock(Packet):
"""
Ask a storage is oid is locked by another transaction.
C -> S
Answer whether a transaction holds the write lock for requested object.
"""
_fmt = PStruct('has_load_lock',
PTID('tid'),
POID('oid'),
)
_answer = PStruct('answer_has_lock',
POID('oid'),
PEnum('lock_state', LockState),
)
class CheckCurrentSerial(Packet): class CheckCurrentSerial(Packet):
""" """
Verifies if given serial is current for object oid in the database, and Verifies if given serial is current for object oid in the database, and
...@@ -1703,8 +1681,6 @@ class Packets(dict): ...@@ -1703,8 +1681,6 @@ class Packets(dict):
ClusterState) ClusterState)
AskObjectUndoSerial, AnswerObjectUndoSerial = register( AskObjectUndoSerial, AnswerObjectUndoSerial = register(
ObjectUndoSerial) ObjectUndoSerial)
AskHasLock, AnswerHasLock = register(
HasLock)
AskTIDsFrom, AnswerTIDsFrom = register( AskTIDsFrom, AnswerTIDsFrom = register(
TIDListFrom) TIDListFrom)
AskPack, AnswerPack = register( AskPack, AnswerPack = register(
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum, add64 from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \ from neo.lib.protocol import Packets, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError from ..transactions import ConflictError, DelayedError, NotRegisteredError
from ..exception import AlreadyPendingError from ..exception import AlreadyPendingError
...@@ -159,17 +159,6 @@ class ClientOperationHandler(EventHandler): ...@@ -159,17 +159,6 @@ class ClientOperationHandler(EventHandler):
p = Packets.AnswerObjectUndoSerial(object_tid_dict) p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p) conn.answer(p)
def askHasLock(self, conn, ttid, oid):
locking_tid = self.app.tm.getLockingTID(oid)
logging.info('%r check lock of %r:%r', conn, dump(ttid), dump(oid))
if locking_tid is None:
state = LockState.NOT_LOCKED
elif locking_tid is ttid:
state = LockState.GRANTED
else:
state = LockState.GRANTED_TO_OTHER
conn.answer(Packets.AnswerHasLock(oid, state))
def askObjectHistory(self, conn, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
if first >= last: if first >= last:
raise ProtocolError('invalid offsets') raise ProtocolError('invalid offsets')
......
...@@ -254,33 +254,6 @@ class ClientTests(NEOFunctionalTest): ...@@ -254,33 +254,6 @@ class ClientTests(NEOFunctionalTest):
self.__checkTree(neo_conn.root()['trees']) self.__checkTree(neo_conn.root()['trees'])
self.assertEqual(dump, self.__dump(neo_db.storage)) self.assertEqual(dump, self.__dump(neo_db.storage))
def testLockTimeout(self):
""" Hold a lock on an object to block a second transaction """
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
self.neo.start()
# BUG: The following 2 lines creates 2 app, i.e. 2 TCP connections
# to the storage, so there may be a race condition at network
# level and 'st2.store' may be effective before 'st1.store'.
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = u'user'
t1.description = t2.description = u'desc'
oid = st1.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject())
st2.tpc_begin(t2)
st1.tpc_begin(t1)
st1.store(oid, rev, data, '', t1)
# this store will be delayed
st2.store(oid, rev, data, '', t2)
# the vote will timeout as t1 never release the lock
self.assertRaises(ConflictError, st2.tpc_vote, t2)
self.runWithTimeout(40, test)
def testIPv6Client(self): def testIPv6Client(self):
""" Test the connectivity of an IPv6 connection for neo client """ """ Test the connectivity of an IPv6 connection for neo client """
......
...@@ -20,7 +20,7 @@ from .. import NeoUnitTestBase ...@@ -20,7 +20,7 @@ from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
from neo.lib.util import p64 from neo.lib.util import p64
from neo.lib.protocol import INVALID_TID, Packets, LockState from neo.lib.protocol import INVALID_TID, Packets
class StorageClientHandlerTests(NeoUnitTestBase): class StorageClientHandlerTests(NeoUnitTestBase):
...@@ -100,24 +100,5 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -100,24 +100,5 @@ class StorageClientHandlerTests(NeoUnitTestBase):
self.operation.askObjectUndoSerial(conn, tid, ltid, undone_tid, oid_list) self.operation.askObjectUndoSerial(conn, tid, ltid, undone_tid, oid_list)
self.checkErrorPacket(conn) self.checkErrorPacket(conn)
def test_askHasLock(self):
tid_1 = self.getNextTID()
tid_2 = self.getNextTID()
oid = self.getNextTID()
def getLockingTID(oid):
return locking_tid
self.app.tm.getLockingTID = getLockingTID
for locking_tid, status in (
(None, LockState.NOT_LOCKED),
(tid_1, LockState.GRANTED),
(tid_2, LockState.GRANTED_TO_OTHER),
):
conn = self._getConnection()
self.operation.askHasLock(conn, tid_1, oid)
p_oid, p_status = self.checkAnswerPacket(conn,
Packets.AnswerHasLock).decode()
self.assertEqual(oid, p_oid)
self.assertEqual(status, p_status)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment