Commit cc8d0a7c authored by Julien Muchembled's avatar Julien Muchembled

Fixes/improvements to EventQueue

- Make sure that errors while processing a delayed packet are reported to the
  connection that sent this packet.
- Provide a mechanism to process events for the same connection in
  chronological order.
parent 3e6adac3
......@@ -25,6 +25,10 @@ from .protocol import (
from .util import cached_property
class DelayEvent(Exception):
pass
class EventHandler(object):
"""This class handles events."""
......@@ -66,6 +70,9 @@ class EventHandler(object):
raise UnexpectedPacketError('no handler found')
args = packet.decode() or ()
method(conn, *args, **kw)
except DelayEvent:
assert not kw, kw
self.getEventQueue().queueEvent(method, conn, args)
except UnexpectedPacketError, e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
......@@ -268,31 +275,75 @@ class AnswerBaseHandler(EventHandler):
raise ConnectionClosed
class _DelayedConnectionEvent(EventHandler):
handler_method_name = '_func'
__new__ = object.__new__
def __init__(self, func, conn, args):
self._args = args
self._conn = conn
self._func = func
self._msg_id = conn.getPeerId()
def __call__(self):
conn = self._conn
if not conn.isClosed():
msg_id = conn.getPeerId()
try:
self.dispatch(conn, self)
finally:
conn.setPeerId(msg_id)
def __repr__(self):
return '<%s: 0x%x %s>' % (self._func.__name__, self._msg_id, self._conn)
def decode(self):
return self._args
def getEventQueue(self):
raise
def getId(self):
return self._msg_id
class EventQueue(object):
def __init__(self):
self._event_queue = deque()
self._executing_event = -1
def queueEvent(self, some_callable, conn=None, args=()):
msg_id = None if conn is None else conn.getPeerId()
self._event_queue.append((some_callable, msg_id, conn, args))
def queueEvent(self, func, conn=None, args=()):
self._event_queue.append(func if conn is None else
_DelayedConnectionEvent(func, conn, args))
def executeQueuedEvents(self):
p = self._event_queue.popleft
for _ in xrange(len(self._event_queue)):
some_callable, msg_id, conn, args = p()
if conn is None:
some_callable(*args)
elif not conn.isClosed():
orig_msg_id = conn.getPeerId()
try:
conn.setPeerId(msg_id)
some_callable(conn, *args)
finally:
conn.setPeerId(orig_msg_id)
# Not reentrant. When processing a queued event, calling this method
# only tells the caller to retry all events from the beginning, because
# events for the same connection must be processed in chronological
# order.
self._executing_event += 1
if self._executing_event:
return
queue = self._event_queue
n = len(queue)
while n:
try:
queue[0]()
except DelayEvent:
queue.rotate(-1)
else:
del queue[0]
n -= 1
if self._executing_event:
self._executing_event = 0
queue.rotate(-n)
n = len(queue)
self._executing_event = -1
def logQueuedEvents(self):
if self._event_queue:
logging.info(" Pending events:")
for event, msg_id, conn, args in self._event_queue:
logging.info(' %r: %r %r', event.__name__, msg_id, conn)
for event in self._event_queue:
logging.info(' %r', event)
......@@ -19,7 +19,7 @@ from os.path import exists, getsize
import json
from . import attributeTracker, logging
from .handler import EventQueue
from .handler import DelayEvent, EventQueue
from .protocol import formatNodeList, uuid_str, \
NodeTypes, NodeStates, NotReadyError, ProtocolError
......@@ -358,10 +358,10 @@ class NodeManager(EventQueue):
def getByUUID(self, uuid, *id_timestamp):
"""Return the node that matches with a given UUID
If an id timestamp is passed, None is returned if identification must
be delayed. This is because we rely only on the notifications from the
master to recognize nodes (otherwise, we could get id conflicts) and
such notifications may be late in some cases, even when the master
If an id timestamp is passed, DelayEvent is raised if identification
must be delayed. This is because we rely only on the notifications from
the master to recognize nodes (otherwise, we could get id conflicts)
and such notifications may be late in some cases, even when the master
expects us to not reject the connection.
"""
node = self._uuid_dict.get(uuid)
......@@ -369,7 +369,7 @@ class NodeManager(EventQueue):
id_timestamp, = id_timestamp
if not node or node.id_timestamp != id_timestamp:
if self._timestamp < id_timestamp:
return
raise DelayEvent
# The peer got disconnected from the master.
raise NotReadyError('unknown by master')
return node
......
......@@ -51,13 +51,14 @@ class ClientServiceHandler(MasterHandler):
def askNewOIDs(self, conn, num_oids):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
def getEventQueue(self):
# for failedVote
return self.app.tm
def failedVote(self, conn, *args):
app = self.app
ok = app.tm.vote(app, *args)
if ok is None:
app.tm.queueEvent(self.failedVote, conn, args)
else:
conn.answer((Errors.Ack if ok else Errors.IncompleteTransaction)())
conn.answer((Errors.Ack if app.tm.vote(app, *args) else
Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list):
app = self.app
......
......@@ -18,13 +18,10 @@ from collections import deque
from time import time
from struct import pack, unpack
from neo.lib import logging
from neo.lib.handler import EventQueue
from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID
from neo.lib.util import dump, u64, addTID, tidFromTime
class DelayedError(Exception):
pass
class Transaction(object):
"""
A pending transaction
......@@ -330,7 +327,7 @@ class TransactionManager(EventQueue):
# we won't be able to finish this one, because that would make
# the cluster non-operational. Let's tell the caller to retry
# later.
return
raise DelayEvent
# Allow the client to finish the transaction,
# even if it will disconnect storage nodes.
txn._failed = failed
......
......@@ -15,11 +15,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.handler import DelayEvent, EventHandler
from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError
from ..transactions import ConflictError, NotRegisteredError
import time
# Log stores taking (incl. lock delays) more than this many seconds.
......@@ -37,12 +37,14 @@ class ClientOperationHandler(EventHandler):
t[4], t[0])
conn.answer(p)
def getEventQueue(self):
# for read rpc
return self.app.tm
def askObject(self, conn, oid, serial, tid):
app = self.app
if app.tm.loadLocked(oid):
# Delay the response.
app.tm.queueEvent(self.askObject, conn, (oid, serial, tid))
return
raise DelayEvent
o = app.dm.getObject(oid, serial, tid)
try:
serial, next_serial, compression, checksum, data, data_serial = o
......@@ -74,10 +76,6 @@ class ClientOperationHandler(EventHandler):
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(err.tid))
except DelayedError:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, request_time))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
......@@ -86,7 +84,7 @@ class ClientOperationHandler(EventHandler):
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(None))
else:
if SLOW_STORE is not None:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
......@@ -104,8 +102,13 @@ class ClientOperationHandler(EventHandler):
assert data_serial is None
else:
checksum = data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, time.time())
try:
self._askStoreObject(conn, oid, serial, compression,
checksum, data, data_serial, ttid, None)
except DelayEvent:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, time.time()))
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
......@@ -152,9 +155,7 @@ class ClientOperationHandler(EventHandler):
raise ProtocolError('invalid offsets')
app = self.app
if app.tm.loadLocked(oid):
# Delay the response.
app.tm.queueEvent(self.askObjectHistory, conn, (oid, first, last))
return
raise DelayEvent
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
p = Errors.OidNotFound(dump(oid))
......@@ -164,7 +165,12 @@ class ClientOperationHandler(EventHandler):
def askCheckCurrentSerial(self, conn, ttid, oid, serial):
self.app.tm.register(conn, ttid)
self._askCheckCurrentSerial(conn, ttid, oid, serial, time.time())
try:
self._askCheckCurrentSerial(conn, ttid, oid, serial, None)
except DelayEvent:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askCheckCurrentSerial,
conn, (ttid, oid, serial, time.time()))
def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time):
try:
......@@ -172,10 +178,6 @@ class ClientOperationHandler(EventHandler):
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
except DelayedError:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askCheckCurrentSerial, conn,
(ttid, oid, serial, request_time))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s',
......@@ -184,7 +186,7 @@ class ClientOperationHandler(EventHandler):
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerCheckCurrentSerial(None))
else:
if SLOW_STORE is not None:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
......
......@@ -27,6 +27,10 @@ class IdentificationHandler(EventHandler):
def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification')
def getEventQueue(self):
# for requestIdentification
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
id_timestamp):
self.checkClusterName(name)
......@@ -43,10 +47,6 @@ class IdentificationHandler(EventHandler):
if uuid == app.uuid:
raise ProtocolError("uuid conflict or loopback connection")
node = app.nm.getByUUID(uuid, id_timestamp)
if node is None:
app.nm.queueEvent(self.requestIdentification, conn,
(node_type, uuid, address, name, id_timestamp))
return
if node.isBroken():
raise BrokenNodeDisallowedError
# choose the handler according to the node type
......
......@@ -17,7 +17,7 @@
import weakref
from functools import wraps
from neo.lib.connection import ConnectionClosed
from neo.lib.handler import EventHandler
from neo.lib.handler import DelayEvent, EventHandler
from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \
ZERO_HASH
......@@ -143,12 +143,14 @@ class StorageOperationHandler(EventHandler):
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
def getEventQueue(self):
return self.app.tm
@checkFeedingConnection(check=True)
def askCheckTIDRange(self, conn, *args):
app = self.app
if app.tm.isLockedTid(args[3]): # max_tid
app.tm.queueEvent(self.askCheckTIDRange, conn, args)
return
raise DelayEvent
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
def check():
......@@ -187,9 +189,7 @@ class StorageOperationHandler(EventHandler):
# NotifyTransactionFinished(M->S) + AskFetchTransactions(S->S)
# is faster than
# NotifyUnlockInformation(M->S)
app.tm.queueEvent(self.askFetchTransactions, conn,
(partition, length, min_tid, max_tid, tid_list))
return
raise DelayEvent
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
peer_tid_set = set(tid_list)
......
......@@ -16,7 +16,7 @@
from time import time
from neo.lib import logging
from neo.lib.handler import EventQueue
from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, uuid_str, MAX_TID
......@@ -31,11 +31,6 @@ class ConflictError(Exception):
self.tid = tid
class DelayedError(Exception):
"""
Raised when an object is locked by a previous transaction
"""
class NotRegisteredError(Exception):
"""
Raised when a ttid is not registered
......@@ -219,7 +214,7 @@ class TransactionManager(EventQueue):
Take a write lock on given object, checking that "serial" is
current.
Raises:
DelayedError
DelayEvent
ConflictError
"""
partition = self.getPartition(oid)
......@@ -242,7 +237,7 @@ class TransactionManager(EventQueue):
# before we processed UnlockInformation from the master.
logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(ttid), dump(locked))
raise DelayedError
raise DelayEvent
transaction = self._transaction_dict[ttid]
if partition in self._replicated and (
oid in transaction.store_dict or oid in transaction.checked_set):
......
......@@ -26,10 +26,10 @@ from persistent import Persistent, GHOST
from transaction.interfaces import TransientError
from ZODB import DB, POSException
from ZODB.DB import TransactionalUndo
from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError
from neo.storage.transactions import TransactionManager, ConflictError
from neo.lib.connection import ServerConnection, MTClientConnection
from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.handler import DelayEvent
from neo.lib import logging
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
uuid_str, ZERO_OID, ZERO_TID
......@@ -257,7 +257,7 @@ class Test(NEOThreadedTest):
ob._p_changed = 1
t.commit()
self.assertNotIn(delayUnlockInformation, m2s)
self.assertEqual(except_list, [DelayedError])
self.assertEqual(except_list, [DelayEvent])
@with_cluster(storage_count=2, replicas=1)
def _testDeadlockAvoidance(self, cluster, scenario):
......@@ -323,7 +323,7 @@ class Test(NEOThreadedTest):
# 2: C1 commits
# 3: C2 resolves conflict
self.assertEqual(self._testDeadlockAvoidance([2, 4]),
[DelayedError, DelayedError, ConflictError, ConflictError])
[DelayEvent, DelayEvent, ConflictError, ConflictError])
@expectedFailure(POSException.ConflictError)
def testDeadlockAvoidance(self):
......@@ -334,7 +334,7 @@ class Test(NEOThreadedTest):
# 3: C2 commits
# 4: C1 resolves conflict
self.assertEqual(self._testDeadlockAvoidance([1, 3]),
[DelayedError, ConflictError, "???" ])
[DelayEvent, ConflictError, "???" ])
@with_cluster()
def testConflictResolutionTriggered2(self, cluster):
......@@ -419,9 +419,11 @@ class Test(NEOThreadedTest):
l.acquire()
idle = []
def askObject(orig, *args):
orig(*args)
idle.append(cluster.storage.em.isIdle())
l.release()
try:
orig(*args)
finally:
idle.append(cluster.storage.em.isIdle())
l.release()
if 1:
t, c = cluster.getTransaction()
r = c.root()
......@@ -1099,9 +1101,11 @@ class Test(NEOThreadedTest):
l = threading.Semaphore(0)
idle = []
def requestIdentification(orig, *args):
orig(*args)
idle.append(cluster.storage.em.isIdle())
l.release()
try:
orig(*args)
finally:
idle.append(cluster.storage.em.isIdle())
l.release()
cluster.db
with cluster.master.filterConnection(cluster.storage) as m2s:
delayNotifyInformation = m2s.delayNotifyNodeInformation()
......@@ -1484,9 +1488,10 @@ class Test(NEOThreadedTest):
return isinstance(packet, Packets.AbortTransaction)
def c1_vote(txn):
def vote(orig, *args):
result = orig(*args)
ll()
return result
try:
return orig(*args)
finally:
ll()
with LockLock() as ll, Patch(cluster.master.tm, vote=vote):
commit2.start()
ll()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment