Commit bf03a305 authored by Julien Muchembled's avatar Julien Muchembled

Drop initial implementation of deadlock resolution

It was disabled long time ago and NEO has evolved in such a way that the new
implementation will be completely different.
parent c184ab48
......@@ -417,8 +417,7 @@ class Application(ThreadedApplication):
logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
self._store(self._txn_container.get(transaction), oid, serial, data)
def _store(self, txn_context, oid, serial, data, data_serial=None,
unlock=False):
def _store(self, txn_context, oid, serial, data, data_serial=None):
ttid = txn_context['ttid']
if data is None:
# This is some undo: either a no-data object (undoing object
......@@ -451,7 +450,7 @@ class Application(ThreadedApplication):
involved_nodes = txn_context['involved_nodes']
add_involved_nodes = involved_nodes.add
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid, unlock)
checksum, compressed_data, data_serial, ttid)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, queue=queue, oid=oid, serial=serial)
......@@ -499,27 +498,9 @@ class Application(ThreadedApplication):
# To recover, we must ask storages to release locks we
# hold (to let possibly-competing transactions acquire
# them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send
# object data again.
# WARNING: not maintained code
logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
for store_oid, store_data in data_dict.iteritems():
store_serial = object_serial_dict[store_oid]
if store_data is CHECKED_SERIAL:
self._checkCurrentSerialInTransaction(txn_context,
store_oid, store_serial)
else:
if store_data is None:
# Some undo
logging.warning('Deadlock avoidance cannot reliably'
' work with undo, this must be implemented.')
conflict_serial = ZERO_TID
break
self._store(txn_context, store_oid, store_serial,
store_data, unlock=True)
else:
continue
raise NotImplementedError
else:
data = data_dict.pop(oid)
if data is CHECKED_SERIAL:
......
......@@ -156,7 +156,3 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid)
def alreadyPendingError(self, conn, message):
pass
......@@ -207,9 +207,6 @@ class EventHandler(object):
def brokenNodeDisallowedError(self, conn, message):
raise RuntimeError, 'broken node disallowed error: %s' % (message,)
def alreadyPendingError(self, conn, message):
logging.error('already pending error: %s', message)
def ack(self, conn, message):
logging.debug("no error message: %s", message)
......
......@@ -71,7 +71,6 @@ def ErrorCodes():
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
BROKEN_NODE
ALREADY_PENDING
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
......@@ -954,7 +953,6 @@ class StoreObject(Packet):
PString('data'),
PTID('data_serial'),
PTID('tid'),
PBoolean('unlock'),
)
_answer = PStruct('answer_store_object',
......
......@@ -28,7 +28,6 @@ from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
from .checker import Checker
from .database import buildDatabaseManager
from .exception import AlreadyPendingError
from .handlers import identification, initialization
from .handlers import master, hidden
from .replicator import Replicator
......@@ -70,7 +69,6 @@ class Application(BaseApplication):
# operation related data
self.event_queue = None
self.event_queue_dict = None
self.operational = False
# ready is True when operational and got all informations
......@@ -190,7 +188,6 @@ class Application(BaseApplication):
conn.close()
# create/clear event queue
self.event_queue = deque()
self.event_queue_dict = {}
try:
self.initialize()
self.doOperation()
......@@ -308,28 +305,14 @@ class Application(BaseApplication):
if not node.isHidden():
break
def queueEvent(self, some_callable, conn=None, args=(), key=None,
raise_on_duplicate=True):
event_queue_dict = self.event_queue_dict
n = event_queue_dict.get(key)
if n and raise_on_duplicate:
raise AlreadyPendingError()
def queueEvent(self, some_callable, conn=None, args=()):
msg_id = None if conn is None else conn.getPeerId()
self.event_queue.append((key, some_callable, msg_id, conn, args))
if key is not None:
event_queue_dict[key] = n + 1 if n else 1
self.event_queue.append((some_callable, msg_id, conn, args))
def executeQueuedEvents(self):
p = self.event_queue.popleft
event_queue_dict = self.event_queue_dict
for _ in xrange(len(self.event_queue)):
key, some_callable, msg_id, conn, args = p()
if key is not None:
n = event_queue_dict[key] - 1
if n:
event_queue_dict[key] = n
else:
del event_queue_dict[key]
some_callable, msg_id, conn, args = p()
if conn is None:
some_callable(*args)
elif not conn.isClosed():
......@@ -344,9 +327,8 @@ class Application(BaseApplication):
if self.event_queue is None:
return
logging.info("Pending events:")
for key, event, _msg_id, _conn, args in self.event_queue:
logging.info(' %r:%r: %r:%r %r %r', key, event.__name__,
_msg_id, _conn, args)
for event, msg_id, conn, args in self.event_queue:
logging.info(' %r: %r %r', event.__name__, msg_id, conn)
def newTask(self, iterator):
try:
......
#
# Copyright (C) 2010-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class AlreadyPendingError(Exception):
pass
......@@ -20,7 +20,6 @@ from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError
from ..exception import AlreadyPendingError
import time
# Log stores taking (incl. lock delays) more than this many seconds.
......@@ -71,25 +70,17 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerVoteTransaction())
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, request_time):
data_serial, ttid, request_time):
try:
self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial, unlock)
checksum, data, data_serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(err.tid))
except DelayedError:
# locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise
# AlreadyPendingError, to avoid making client wait for an unneeded
# response.
try:
self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid,
unlock, request_time), key=(oid, ttid),
raise_on_duplicate=unlock)
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
compression, checksum, data, data_serial, ttid, request_time))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
......@@ -105,7 +96,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerStoreObject(None))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid, unlock):
compression, checksum, data, data_serial, ttid):
if 1 < compression:
raise ProtocolError('invalid compression value')
# register the transaction
......@@ -117,7 +108,7 @@ class ClientOperationHandler(EventHandler):
else:
checksum = data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, time.time())
data_serial, ttid, time.time())
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
......@@ -186,11 +177,8 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
except DelayedError:
# locked by a previous transaction, retry later
try:
self.app.queueEvent(self._askCheckCurrentSerial, conn, (ttid,
serial, oid, request_time), key=(oid, ttid))
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
self.app.queueEvent(self._askCheckCurrentSerial, conn,
(ttid, serial, oid, request_time))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s',
......
......@@ -74,12 +74,6 @@ class Transaction(object):
assert oid not in self.checked_set, dump(oid)
self.store_dict[oid] = oid, data_id, value_serial
def cancel(self, oid):
try:
return self.store_dict.pop(oid)[1]
except KeyError:
self.checked_set.remove(oid)
class TransactionManager(object):
"""
......@@ -178,7 +172,7 @@ class TransactionManager(object):
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
def lockObject(self, ttid, serial, oid, unlock=False):
def lockObject(self, ttid, serial, oid):
"""
Take a write lock on given object, checking that "serial" is
current.
......@@ -188,19 +182,6 @@ class TransactionManager(object):
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
if locking_tid == ttid and unlock:
logging.info('Deadlock resolution on %r:%r', dump(oid), dump(ttid))
# A duplicate store means client is resolving a deadlock, so
# drop the lock it held on this object, and drop object data for
# consistency.
del self._store_lock_dict[oid]
data_id = self._transaction_dict[ttid].cancel(oid)
if data_id:
self._app.dm.pruneData((data_id,))
# Give a chance to pending events to take that lock now.
self._app.executeQueuedEvents()
# Attemp to acquire lock again.
locking_tid = self._store_lock_dict.get(oid)
if locking_tid is None:
previous_serial = None
elif locking_tid == ttid:
......@@ -250,11 +231,11 @@ class TransactionManager(object):
transaction = self._transaction_dict[ttid]
except KeyError:
raise NotRegisteredError
self.lockObject(ttid, serial, oid, unlock=True)
self.lockObject(ttid, serial, oid)
transaction.check(oid)
def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial, unlock=False):
value_serial):
"""
Store an object received from client node
"""
......@@ -262,7 +243,7 @@ class TransactionManager(object):
transaction = self._transaction_dict[ttid]
except KeyError:
raise NotRegisteredError
self.lockObject(ttid, serial, oid, unlock=unlock)
self.lockObject(ttid, serial, oid)
# store object
if data is None:
data_id = None
......
......@@ -19,9 +19,7 @@ from ..mock import Mock
from .. import NeoUnitTestBase
from neo.storage.app import Application
from neo.lib.protocol import CellStates
from collections import deque
from neo.lib.pt import PartitionTable
from neo.storage.exception import AlreadyPendingError
class StorageAppTests(NeoUnitTestBase):
......@@ -31,8 +29,6 @@ class StorageAppTests(NeoUnitTestBase):
# create an application object
config = self.getStorageConfiguration(master_number=1)
self.app = Application(config)
self.app.event_queue = deque()
self.app.event_queue_dict = {}
def _tearDown(self, success):
self.app.close()
......@@ -121,26 +117,6 @@ class StorageAppTests(NeoUnitTestBase):
self.assertTrue(cell_list[0].getUUID() in (master_uuid, storage_uuid))
self.assertTrue(cell_list[1].getUUID() in (master_uuid, storage_uuid))
def test_02_queueEvent(self):
self.assertEqual(len(self.app.event_queue), 0)
msg_id = 1325136
event = Mock({'__repr__': 'event'})
conn = Mock({'__repr__': 'conn', 'getPeerId': msg_id})
key = 'foo'
self.app.queueEvent(event, conn, ("test", ), key=key)
self.assertEqual(len(self.app.event_queue), 1)
_key, _event, _msg_id, _conn, args = self.app.event_queue[0]
self.assertEqual(key, _key)
self.assertEqual(msg_id, _msg_id)
self.assertEqual(len(args), 1)
self.assertEqual(args[0], "test")
self.assertRaises(AlreadyPendingError, self.app.queueEvent, event,
conn, ("test2", ), key=key)
self.assertEqual(len(self.app.event_queue), 1)
self.app.queueEvent(event, conn, ("test3", ), key=key,
raise_on_duplicate=False)
self.assertEqual(len(self.app.event_queue), 2)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment