Commit b7a5bc99 authored by Julien Muchembled's avatar Julien Muchembled

Delayed connection acception when the storage node is ready

Now that we do inequality comparisons between timestamps, the master must
use a monotonic clock, to avoid issues when the clock is turned back.
Before, the probability that time.time() returned again the same value was
probably negligible.
parent 0e133ebb
...@@ -20,11 +20,9 @@ ...@@ -20,11 +20,9 @@
could become UP_TO_DATE with appropriate backup_tid, so that the cluster could become UP_TO_DATE with appropriate backup_tid, so that the cluster
stays operational. (FEATURE) stays operational. (FEATURE)
- Finish renaming UUID into NID everywhere (CODE) - Finish renaming UUID into NID everywhere (CODE)
- Implements delayed connection acceptation. - Delayed connection acceptation even when a storage node is not ready ?
Currently, any node that connects too early to another that is busy for Currently, any node that connects too early to another that is busy for
some reasons is immediately rejected with the 'not ready' error code. This some reasons is immediately rejected with the 'not ready' error code.
should be replaced by a queue in the listening node that keep a pool a
nodes that will be accepted late, when the conditions will be satisfied.
This is mainly the case for : This is mainly the case for :
- Client rejected before the cluster is operational - Client rejected before the cluster is operational
- Empty storages rejected during recovery process - Empty storages rejected during recovery process
......
...@@ -225,6 +225,7 @@ class Application(ThreadedApplication): ...@@ -225,6 +225,7 @@ class Application(ThreadedApplication):
self.ignore_invalidations = True self.ignore_invalidations = True
# Get network connection to primary master # Get network connection to primary master
while 1: while 1:
self.nm.reset()
if self.primary_master_node is not None: if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it. # If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node self.trying_master_node = self.primary_master_node
......
...@@ -182,9 +182,9 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -182,9 +182,9 @@ class PrimaryNotificationsHandler(MTEventHandler):
if self.app.pt.filled(): if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm) self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation( super(PrimaryNotificationsHandler, self).notifyNodeInformation(
conn, node_list) conn, timestamp, node_list)
# XXX: 'update' automatically closes DOWN nodes. Do we really want # XXX: 'update' automatically closes DOWN nodes. Do we really want
# to do the same thing for nodes in other non-running states ? # to do the same thing for nodes in other non-running states ?
getByUUID = self.app.nm.getByUUID getByUUID = self.app.nm.getByUUID
......
...@@ -38,6 +38,7 @@ class BootstrapManager(EventHandler): ...@@ -38,6 +38,7 @@ class BootstrapManager(EventHandler):
self.num_replicas = None self.num_replicas = None
self.num_partitions = None self.num_partitions = None
self.current = None self.current = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid) uuid = property(lambda self: self.app.uuid)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys import sys
from collections import deque
from . import logging from . import logging
from .connection import ConnectionClosed from .connection import ConnectionClosed
from .protocol import ( from .protocol import (
...@@ -166,9 +167,9 @@ class EventHandler(object): ...@@ -166,9 +167,9 @@ class EventHandler(object):
return return
conn.close() conn.close()
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, *args):
app = self.app app = self.app
app.nm.update(app, node_list) app.nm.update(app, *args)
def ping(self, conn): def ping(self, conn):
conn.answer(Packets.Pong()) conn.answer(Packets.Pong())
...@@ -265,3 +266,33 @@ class AnswerBaseHandler(EventHandler): ...@@ -265,3 +266,33 @@ class AnswerBaseHandler(EventHandler):
def connectionClosed(self, conn): def connectionClosed(self, conn):
raise ConnectionClosed raise ConnectionClosed
class EventQueue(object):
def __init__(self):
self._event_queue = deque()
def queueEvent(self, some_callable, conn=None, args=()):
msg_id = None if conn is None else conn.getPeerId()
self._event_queue.append((some_callable, msg_id, conn, args))
def executeQueuedEvents(self):
p = self._event_queue.popleft
for _ in xrange(len(self._event_queue)):
some_callable, msg_id, conn, args = p()
if conn is None:
some_callable(*args)
elif not conn.isClosed():
orig_msg_id = conn.getPeerId()
try:
conn.setPeerId(msg_id)
some_callable(conn, *args)
finally:
conn.setPeerId(orig_msg_id)
def logQueuedEvents(self):
if self._event_queue:
logging.info(" Pending events:")
for event, msg_id, conn, args in self._event_queue:
logging.info(' %r: %r %r', event.__name__, msg_id, conn)
...@@ -19,8 +19,9 @@ from os.path import exists, getsize ...@@ -19,8 +19,9 @@ from os.path import exists, getsize
import json import json
from . import attributeTracker, logging from . import attributeTracker, logging
from .handler import EventQueue
from .protocol import formatNodeList, uuid_str, \ from .protocol import formatNodeList, uuid_str, \
NodeTypes, NodeStates, ProtocolError NodeTypes, NodeStates, NotReadyError, ProtocolError
class Node(object): class Node(object):
...@@ -232,7 +233,7 @@ class MasterDB(object): ...@@ -232,7 +233,7 @@ class MasterDB(object):
def __iter__(self): def __iter__(self):
return iter(self._set) return iter(self._set)
class NodeManager(object): class NodeManager(EventQueue):
"""This class manages node status.""" """This class manages node status."""
_master_db = None _master_db = None
...@@ -255,9 +256,14 @@ class NodeManager(object): ...@@ -255,9 +256,14 @@ class NodeManager(object):
self._master_db = db = MasterDB(master_db) self._master_db = db = MasterDB(master_db)
for addr in db: for addr in db:
self.createMaster(address=addr) self.createMaster(address=addr)
self.reset()
close = __init__ close = __init__
def reset(self):
EventQueue.__init__(self)
self._timestamp = 0
def add(self, node): def add(self, node):
if node in self._node_set: if node in self._node_set:
logging.warning('adding a known node %r, ignoring', node) logging.warning('adding a known node %r, ignoring', node)
...@@ -350,10 +356,23 @@ class NodeManager(object): ...@@ -350,10 +356,23 @@ class NodeManager(object):
return self._address_dict.get(address, None) return self._address_dict.get(address, None)
def getByUUID(self, uuid, *id_timestamp): def getByUUID(self, uuid, *id_timestamp):
""" Return the node that match with a given UUID """ """Return the node that matches with a given UUID
If an id timestamp is passed, None is returned if identification must
be delayed. This is because we rely only on the notifications from the
master to recognize nodes (otherwise, we could get id conflicts) and
such notifications may be late in some cases, even when the master
expects us to not reject the connection.
"""
node = self._uuid_dict.get(uuid) node = self._uuid_dict.get(uuid)
if not id_timestamp or node and (node.id_timestamp,) == id_timestamp: if id_timestamp:
return node id_timestamp, = id_timestamp
if not node or node.id_timestamp != id_timestamp:
if self._timestamp < id_timestamp:
return
# The peer got disconnected from the master.
raise NotReadyError('unknown by master')
return node
def _createNode(self, klass, address=None, uuid=None, **kw): def _createNode(self, klass, address=None, uuid=None, **kw):
by_address = self.getByAddress(address) by_address = self.getByAddress(address)
...@@ -389,7 +408,9 @@ class NodeManager(object): ...@@ -389,7 +408,9 @@ class NodeManager(object):
def createFromNodeType(self, node_type, **kw): def createFromNodeType(self, node_type, **kw):
return self._createNode(NODE_TYPE_MAPPING[node_type], **kw) return self._createNode(NODE_TYPE_MAPPING[node_type], **kw)
def update(self, app, node_list): def update(self, app, timestamp, node_list):
assert self._timestamp < timestamp, (self._timestamp, timestamp)
self._timestamp = timestamp
node_set = self._node_set.copy() if app.id_timestamp is None else None node_set = self._node_set.copy() if app.id_timestamp is None else None
for node_type, addr, uuid, state, id_timestamp in node_list: for node_type, addr, uuid, state, id_timestamp in node_list:
# This should be done here (although klass might not be used in this # This should be done here (although klass might not be used in this
...@@ -443,12 +464,14 @@ class NodeManager(object): ...@@ -443,12 +464,14 @@ class NodeManager(object):
for node in node_set - self._node_set: for node in node_set - self._node_set:
self.remove(node) self.remove(node)
self.log() self.log()
self.executeQueuedEvents()
def log(self): def log(self):
logging.info('Node manager : %u nodes', len(self._node_set)) logging.info('Node manager : %u nodes', len(self._node_set))
if self._node_set: if self._node_set:
logging.info('\n'.join(formatNodeList( logging.info('\n'.join(formatNodeList(
map(Node.asTuple, self._node_set), ' * '))) map(Node.asTuple, self._node_set), ' * ')))
self.logQueuedEvents()
@apply @apply
def NODE_TYPE_MAPPING(): def NODE_TYPE_MAPPING():
......
...@@ -1148,6 +1148,7 @@ class NotifyNodeInformation(Packet): ...@@ -1148,6 +1148,7 @@ class NotifyNodeInformation(Packet):
Notify information about one or more nodes. PM -> Any. Notify information about one or more nodes. PM -> Any.
""" """
_fmt = PStruct('notify_node_informations', _fmt = PStruct('notify_node_informations',
PFloat('id_timestamp'),
PFNodeList, PFNodeList,
) )
...@@ -1748,3 +1749,8 @@ def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)): ...@@ -1748,3 +1749,8 @@ def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)):
for i in xrange(len(node_list[0]) - 1)) for i in xrange(len(node_list[0]) - 1))
return map((prefix + t + '%s').__mod__, node_list) return map((prefix + t + '%s').__mod__, node_list)
return () return ()
NotifyNodeInformation._neolog = staticmethod(lambda timestamp, node_list:
((timestamp,), formatNodeList(node_list, ' ! ')))
Error._neolog = staticmethod(lambda *args: ((), ("%s (%s)" % args,)))
...@@ -29,6 +29,16 @@ from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation ...@@ -29,6 +29,16 @@ from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation
class StateChangedException(Exception): pass class StateChangedException(Exception): pass
_previous_time = 0
def monotonic_time():
global _previous_time
now = time()
if _previous_time < now:
_previous_time = now
else:
_previous_time = now = _previous_time + 1e-3
return now
from .backup_app import BackupApplication from .backup_app import BackupApplication
from .handlers import election, identification, secondary from .handlers import election, identification, secondary
from .handlers import administration, client, storage from .handlers import administration, client, storage
...@@ -240,11 +250,12 @@ class Application(BaseApplication): ...@@ -240,11 +250,12 @@ class Application(BaseApplication):
continue continue
node_dict[NodeTypes.MASTER].append(node_info) node_dict[NodeTypes.MASTER].append(node_info)
now = monotonic_time()
# send at most one non-empty notification packet per node # send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType()) node_list = node_dict.get(node.getType())
if node_list and node.isRunning() and node is not exclude: if node_list and node.isRunning() and node is not exclude:
node.notify(Packets.NotifyNodeInformation(node_list)) node.notify(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list): def broadcastPartitionChanges(self, cell_list):
"""Broadcast a Notify Partition Changes packet.""" """Broadcast a Notify Partition Changes packet."""
...@@ -398,6 +409,7 @@ class Application(BaseApplication): ...@@ -398,6 +409,7 @@ class Application(BaseApplication):
conn.close() conn.close()
# Reconnect to primary master node. # Reconnect to primary master node.
self.nm.reset()
primary_handler = secondary.PrimaryHandler(self) primary_handler = secondary.PrimaryHandler(self)
ClientConnection(self, primary_handler, self.primary_master_node) ClientConnection(self, primary_handler, self.primary_master_node)
...@@ -491,11 +503,12 @@ class Application(BaseApplication): ...@@ -491,11 +503,12 @@ class Application(BaseApplication):
logging.info("asking remaining nodes to shutdown") logging.info("asking remaining nodes to shutdown")
handler = EventHandler(self) handler = EventHandler(self)
now = monotonic_time()
for node in self.nm.getConnectedList(): for node in self.nm.getConnectedList():
conn = node.getConnection() conn = node.getConnection()
if node.isStorage(): if node.isStorage():
conn.setHandler(handler) conn.setHandler(handler)
conn.notify(Packets.NotifyNodeInformation((( conn.notify(Packets.NotifyNodeInformation(now, ((
node.getType(), node.getAddress(), node.getUUID(), node.getType(), node.getAddress(), node.getUUID(),
NodeStates.TEMPORARILY_DOWN, None),))) NodeStates.TEMPORARILY_DOWN, None),)))
conn.abort() conn.abort()
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..app import monotonic_time
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import StoppedOperation from neo.lib.exception import StoppedOperation
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
...@@ -88,7 +89,7 @@ class MasterHandler(EventHandler): ...@@ -88,7 +89,7 @@ class MasterHandler(EventHandler):
node_list.extend(n.asTuple() for n in nm.getMasterList()) node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getClientList()) node_list.extend(n.asTuple() for n in nm.getClientList())
node_list.extend(n.asTuple() for n in nm.getStorageList()) node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(node_list)) conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
pt = self.app.pt pt = self.app.pt
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import random import random
from . import MasterHandler from . import MasterHandler
from ..app import StateChangedException from ..app import monotonic_time, StateChangedException
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import StoppedOperation from neo.lib.exception import StoppedOperation
from neo.lib.pt import PartitionTableException from neo.lib.pt import PartitionTableException
...@@ -103,7 +103,8 @@ class AdministrationHandler(MasterHandler): ...@@ -103,7 +103,8 @@ class AdministrationHandler(MasterHandler):
node.setState(state) node.setState(state)
if node.isConnected(): if node.isConnected():
# notify itself so it can shutdown # notify itself so it can shutdown
node.notify(Packets.NotifyNodeInformation([node.asTuple()])) node.notify(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()]))
# close to avoid handle the closure as a connection lost # close to avoid handle the closure as a connection lost
node.getConnection().abort() node.getConnection().abort()
if keep: if keep:
...@@ -121,7 +122,8 @@ class AdministrationHandler(MasterHandler): ...@@ -121,7 +122,8 @@ class AdministrationHandler(MasterHandler):
# ignores non-running nodes # ignores non-running nodes
assert not node.isRunning() assert not node.isRunning()
if node.isConnected(): if node.isConnected():
node.notify(Packets.NotifyNodeInformation([node.asTuple()])) node.notify(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
def addPendingNodes(self, conn, uuid_list): def addPendingNodes(self, conn, uuid_list):
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors
from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
class ClientServiceHandler(MasterHandler): class ClientServiceHandler(MasterHandler):
...@@ -36,7 +37,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -36,7 +37,7 @@ class ClientServiceHandler(MasterHandler):
node_list = [nm.getByUUID(conn.getUUID()).asTuple()] # for id_timestamp node_list = [nm.getByUUID(conn.getUUID()).asTuple()] # for id_timestamp
node_list.extend(n.asTuple() for n in nm.getMasterList()) node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getStorageList()) node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(node_list)) conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askBeginTransaction(self, conn, tid): def askBeginTransaction(self, conn, tid):
""" """
......
...@@ -56,7 +56,7 @@ class BaseElectionHandler(EventHandler): ...@@ -56,7 +56,7 @@ class BaseElectionHandler(EventHandler):
class ClientElectionHandler(BaseElectionHandler): class ClientElectionHandler(BaseElectionHandler):
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, timestamp, node_list):
# XXX: For the moment, do nothing because # XXX: For the moment, do nothing because
# we'll close this connection and reconnect. # we'll close this connection and reconnect.
pass pass
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from time import time
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, ProtocolError, uuid_str NotReadyError, ProtocolError, uuid_str
from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
class IdentificationHandler(MasterHandler): class IdentificationHandler(MasterHandler):
...@@ -92,7 +92,7 @@ class IdentificationHandler(MasterHandler): ...@@ -92,7 +92,7 @@ class IdentificationHandler(MasterHandler):
uuid=uuid, address=address) uuid=uuid, address=address)
else: else:
node.setUUID(uuid) node.setUUID(uuid)
node.id_timestamp = time() node.id_timestamp = monotonic_time()
node.setState(state) node.setState(state)
node.setConnection(conn) node.setConnection(conn)
conn.setHandler(handler) conn.setHandler(handler)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys import sys
from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.exception import ElectionFailure, PrimaryFailure from neo.lib.exception import ElectionFailure, PrimaryFailure
...@@ -38,7 +39,7 @@ class SecondaryMasterHandler(MasterHandler): ...@@ -38,7 +39,7 @@ class SecondaryMasterHandler(MasterHandler):
def _notifyNodeInformation(self, conn): def _notifyNodeInformation(self, conn):
node_list = [n.asTuple() for n in self.app.nm.getMasterList()] node_list = [n.asTuple() for n in self.app.nm.getMasterList()]
conn.notify(Packets.NotifyNodeInformation(node_list)) conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list))
class PrimaryHandler(EventHandler): class PrimaryHandler(EventHandler):
""" Handler used by secondaries to handle primary master""" """ Handler used by secondaries to handle primary master"""
...@@ -72,8 +73,9 @@ class PrimaryHandler(EventHandler): ...@@ -72,8 +73,9 @@ class PrimaryHandler(EventHandler):
def notifyClusterInformation(self, conn, state): def notifyClusterInformation(self, conn, state):
self.app.cluster_state = state self.app.cluster_state = state
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryHandler, self).notifyNodeInformation(conn, node_list) super(PrimaryHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, _, uuid, state, _ in node_list: for node_type, _, uuid, state, _ in node_list:
assert node_type == NodeTypes.MASTER, node_type assert node_type == NodeTypes.MASTER, node_type
if uuid == self.app.uuid and state == NodeStates.UNKNOWN: if uuid == self.app.uuid and state == NodeStates.UNKNOWN:
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from .app import monotonic_time
from .handlers import MasterHandler from .handlers import MasterHandler
...@@ -170,8 +171,9 @@ class RecoveryManager(MasterHandler): ...@@ -170,8 +171,9 @@ class RecoveryManager(MasterHandler):
new_nodes = app.pt.load(ptid, row_list, app.nm) new_nodes = app.pt.load(ptid, row_list, app.nm)
except IndexError: except IndexError:
raise ProtocolError('Invalid offset') raise ProtocolError('Invalid offset')
self._notifyAdmins(Packets.NotifyNodeInformation(new_nodes), self._notifyAdmins(
Packets.SendPartitionTable(ptid, row_list)) Packets.NotifyNodeInformation(monotonic_time(), new_nodes),
Packets.SendPartitionTable(ptid, row_list))
self.ask_pt = () self.ask_pt = ()
uuid = conn.getUUID() uuid = conn.getUUID()
app.backup_tid = self.backup_tid_dict[uuid] app.backup_tid = self.backup_tid_dict[uuid]
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
import bz2, gzip, errno, optparse, os, signal, sqlite3, sys, time import bz2, gzip, errno, optparse, os, signal, sqlite3, sys, time
from bisect import insort from bisect import insort
from logging import getLevelName from logging import getLevelName
from functools import partial
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile) comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile)
...@@ -94,11 +93,6 @@ class Log(object): ...@@ -94,11 +93,6 @@ class Log(object):
exec bz2.decompress(text) in g exec bz2.decompress(text) in g
for x in 'uuid_str', 'Packets', 'PacketMalformedError': for x in 'uuid_str', 'Packets', 'PacketMalformedError':
setattr(self, x, g[x]) setattr(self, x, g[x])
try:
self.notifyNodeInformation = partial(g['formatNodeList'],
prefix=' ! ')
except KeyError:
self.notifyNodeInformation = None
try: try:
self._next_protocol, = q("SELECT date FROM protocol WHERE date>?", self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
(date,)).next() (date,)).next()
...@@ -131,8 +125,8 @@ class Log(object): ...@@ -131,8 +125,8 @@ class Log(object):
body = None body = None
msg = ['#0x%04x %-30s %s' % (msg_id, msg, peer)] msg = ['#0x%04x %-30s %s' % (msg_id, msg, peer)]
if body is not None: if body is not None:
logger = getattr(self, p.handler_method_name, None) log = getattr(p, '_neolog', None)
if logger or self._decode_all: if log or self._decode_all:
p = p() p = p()
p._id = msg_id p._id = msg_id
p._body = body p._body = body
...@@ -141,15 +135,13 @@ class Log(object): ...@@ -141,15 +135,13 @@ class Log(object):
except self.PacketMalformedError: except self.PacketMalformedError:
msg.append("Can't decode packet") msg.append("Can't decode packet")
else: else:
if logger: if log:
msg += logger(*args) args, extra = log(*args)
elif args: msg += extra
msg = '%s \t| %r' % (msg[0], args), if args and self._decode_all:
msg[0] += ' \t| ' + repr(args)
return date, name, 'PACKET', msg return date, name, 'PACKET', msg
def error(self, code, message):
return "%s (%s)" % (code, message),
def emit_many(log_list): def emit_many(log_list):
log_list = [(log, iter(log).next) for log in log_list] log_list = [(log, iter(log).next) for log in log_list]
......
...@@ -68,7 +68,6 @@ class Application(BaseApplication): ...@@ -68,7 +68,6 @@ class Application(BaseApplication):
self.master_node = None self.master_node = None
# operation related data # operation related data
self.event_queue = None
self.operational = False self.operational = False
# ready is True when operational and got all informations # ready is True when operational and got all informations
...@@ -93,7 +92,6 @@ class Application(BaseApplication): ...@@ -93,7 +92,6 @@ class Application(BaseApplication):
def log(self): def log(self):
self.em.log() self.em.log()
self.logQueuedEvents()
self.nm.log() self.nm.log()
self.tm.log() self.tm.log()
if self.pt is not None: if self.pt is not None:
...@@ -186,8 +184,6 @@ class Application(BaseApplication): ...@@ -186,8 +184,6 @@ class Application(BaseApplication):
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
if conn not in (self.listening_conn, self.master_conn): if conn not in (self.listening_conn, self.master_conn):
conn.close() conn.close()
# create/clear event queue
self.event_queue = deque()
try: try:
self.initialize() self.initialize()
self.doOperation() self.doOperation()
...@@ -305,31 +301,6 @@ class Application(BaseApplication): ...@@ -305,31 +301,6 @@ class Application(BaseApplication):
if not node.isHidden(): if not node.isHidden():
break break
def queueEvent(self, some_callable, conn=None, args=()):
msg_id = None if conn is None else conn.getPeerId()
self.event_queue.append((some_callable, msg_id, conn, args))
def executeQueuedEvents(self):
p = self.event_queue.popleft
for _ in xrange(len(self.event_queue)):
some_callable, msg_id, conn, args = p()
if conn is None:
some_callable(*args)
elif not conn.isClosed():
orig_msg_id = conn.getPeerId()
try:
conn.setPeerId(msg_id)
some_callable(conn, *args)
finally:
conn.setPeerId(orig_msg_id)
def logQueuedEvents(self):
if self.event_queue is None:
return
logging.info("Pending events:")
for event, msg_id, conn, args in self.event_queue:
logging.info(' %r: %r %r', event.__name__, msg_id, conn)
def newTask(self, iterator): def newTask(self, iterator):
try: try:
iterator.next() iterator.next()
......
...@@ -109,7 +109,7 @@ class Checker(object): ...@@ -109,7 +109,7 @@ class Checker(object):
self.source = source self.source = source
def start(): def start():
if app.tm.isLockedTid(max_tid): if app.tm.isLockedTid(max_tid):
app.queueEvent(start) app.tm.queueEvent(start)
return return
args = partition, CHECK_COUNT, min_tid, max_tid args = partition, CHECK_COUNT, min_tid, max_tid
p = Packets.AskCheckTIDRange(*args) p = Packets.AskCheckTIDRange(*args)
......
...@@ -36,10 +36,11 @@ class BaseMasterHandler(EventHandler): ...@@ -36,10 +36,11 @@ class BaseMasterHandler(EventHandler):
def notifyClusterInformation(self, conn, state): def notifyClusterInformation(self, conn, state):
self.app.changeClusterState(state) self.app.changeClusterState(state)
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, timestamp, node_list):
"""Store information on nodes, only if this is sent by a primary """Store information on nodes, only if this is sent by a primary
master node.""" master node."""
super(BaseMasterHandler, self).notifyNodeInformation(conn, node_list) super(BaseMasterHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, _, uuid, state, _ in node_list: for node_type, _, uuid, state, _ in node_list:
if uuid == self.app.uuid: if uuid == self.app.uuid:
# This is me, do what the master tell me # This is me, do what the master tell me
......
...@@ -41,7 +41,7 @@ class ClientOperationHandler(EventHandler): ...@@ -41,7 +41,7 @@ class ClientOperationHandler(EventHandler):
app = self.app app = self.app
if app.tm.loadLocked(oid): if app.tm.loadLocked(oid):
# Delay the response. # Delay the response.
app.queueEvent(self.askObject, conn, (oid, serial, tid)) app.tm.queueEvent(self.askObject, conn, (oid, serial, tid))
return return
o = app.dm.getObject(oid, serial, tid) o = app.dm.getObject(oid, serial, tid)
try: try:
...@@ -79,7 +79,7 @@ class ClientOperationHandler(EventHandler): ...@@ -79,7 +79,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerStoreObject(err.tid)) conn.answer(Packets.AnswerStoreObject(err.tid))
except DelayedError: except DelayedError:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
self.app.queueEvent(self._askStoreObject, conn, (oid, serial, self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, request_time)) compression, checksum, data, data_serial, ttid, request_time))
except NotRegisteredError: except NotRegisteredError:
# transaction was aborted, cancel this event # transaction was aborted, cancel this event
...@@ -156,7 +156,7 @@ class ClientOperationHandler(EventHandler): ...@@ -156,7 +156,7 @@ class ClientOperationHandler(EventHandler):
app = self.app app = self.app
if app.tm.loadLocked(oid): if app.tm.loadLocked(oid):
# Delay the response. # Delay the response.
app.queueEvent(self.askObjectHistory, conn, (oid, first, last)) app.tm.queueEvent(self.askObjectHistory, conn, (oid, first, last))
return return
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None: if history_list is None:
...@@ -177,7 +177,7 @@ class ClientOperationHandler(EventHandler): ...@@ -177,7 +177,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid)) conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
except DelayedError: except DelayedError:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
self.app.queueEvent(self._askCheckCurrentSerial, conn, self.app.tm.queueEvent(self._askCheckCurrentSerial, conn,
(ttid, serial, oid, request_time)) (ttid, serial, oid, request_time))
except NotRegisteredError: except NotRegisteredError:
# transaction was aborted, cancel this event # transaction was aborted, cancel this event
......
...@@ -44,11 +44,9 @@ class IdentificationHandler(EventHandler): ...@@ -44,11 +44,9 @@ class IdentificationHandler(EventHandler):
raise ProtocolError("uuid conflict or loopback connection") raise ProtocolError("uuid conflict or loopback connection")
node = app.nm.getByUUID(uuid, id_timestamp) node = app.nm.getByUUID(uuid, id_timestamp)
if node is None: if node is None:
# Do never create node automatically, or we could get id app.nm.queueEvent(self.requestIdentification, conn,
# conflicts. We must only rely on the notifications from the (node_type, uuid, address, name, id_timestamp))
# master to recognize nodes. So this is not always an error: return
# maybe there are incoming notifications.
raise NotReadyError('unknown node: retry later')
if node.isBroken(): if node.isBroken():
raise BrokenNodeDisallowedError raise BrokenNodeDisallowedError
# choose the handler according to the node type # choose the handler according to the node type
......
...@@ -147,7 +147,7 @@ class StorageOperationHandler(EventHandler): ...@@ -147,7 +147,7 @@ class StorageOperationHandler(EventHandler):
def askCheckTIDRange(self, conn, *args): def askCheckTIDRange(self, conn, *args):
app = self.app app = self.app
if app.tm.isLockedTid(args[3]): # max_tid if app.tm.isLockedTid(args[3]): # max_tid
app.queueEvent(self.askCheckTIDRange, conn, args) app.tm.queueEvent(self.askCheckTIDRange, conn, args)
return return
msg_id = conn.getPeerId() msg_id = conn.getPeerId()
conn = weakref.proxy(conn) conn = weakref.proxy(conn)
...@@ -187,7 +187,7 @@ class StorageOperationHandler(EventHandler): ...@@ -187,7 +187,7 @@ class StorageOperationHandler(EventHandler):
# NotifyTransactionFinished(M->S) + AskFetchTransactions(S->S) # NotifyTransactionFinished(M->S) + AskFetchTransactions(S->S)
# is faster than # is faster than
# NotifyUnlockInformation(M->S) # NotifyUnlockInformation(M->S)
app.queueEvent(self.askFetchTransactions, conn, app.tm.queueEvent(self.askFetchTransactions, conn,
(partition, length, min_tid, max_tid, tid_list)) (partition, length, min_tid, max_tid, tid_list))
return return
msg_id = conn.getPeerId() msg_id = conn.getPeerId()
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
from time import time from time import time
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventQueue
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_TID from neo.lib.protocol import ProtocolError, uuid_str, ZERO_TID
...@@ -75,12 +76,13 @@ class Transaction(object): ...@@ -75,12 +76,13 @@ class Transaction(object):
self.store_dict[oid] = oid, data_id, value_serial self.store_dict[oid] = oid, data_id, value_serial
class TransactionManager(object): class TransactionManager(EventQueue):
""" """
Manage pending transaction and locks Manage pending transaction and locks
""" """
def __init__(self, app): def __init__(self, app):
EventQueue.__init__(self)
self._app = app self._app = app
self._transaction_dict = {} self._transaction_dict = {}
self._store_lock_dict = {} self._store_lock_dict = {}
...@@ -109,6 +111,7 @@ class TransactionManager(object): ...@@ -109,6 +111,7 @@ class TransactionManager(object):
""" """
Reset the transaction manager Reset the transaction manager
""" """
EventQueue.__init__(self)
self._transaction_dict.clear() self._transaction_dict.clear()
self._store_lock_dict.clear() self._store_lock_dict.clear()
self._load_lock_dict.clear() self._load_lock_dict.clear()
...@@ -290,7 +293,7 @@ class TransactionManager(object): ...@@ -290,7 +293,7 @@ class TransactionManager(object):
# remove the transaction # remove the transaction
del self._transaction_dict[ttid] del self._transaction_dict[ttid]
# some locks were released, some pending locks may now succeed # some locks were released, some pending locks may now succeed
self._app.executeQueuedEvents() self.executeQueuedEvents()
def abortFor(self, uuid): def abortFor(self, uuid):
""" """
...@@ -319,6 +322,7 @@ class TransactionManager(object): ...@@ -319,6 +322,7 @@ class TransactionManager(object):
logging.info(' Write locks:') logging.info(' Write locks:')
for oid, ttid in self._store_lock_dict.iteritems(): for oid, ttid in self._store_lock_dict.iteritems():
logging.info(' %r by %r', dump(oid), dump(ttid)) logging.info(' %r by %r', dump(oid), dump(ttid))
self.logQueuedEvents()
def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id): def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid) lock_tid = self.getLockingTID(oid)
......
...@@ -19,8 +19,8 @@ from ..mock import Mock ...@@ -19,8 +19,8 @@ from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.util import p64 from neo.lib.util import p64
from neo.lib.protocol import NodeTypes, NodeStates, Packets from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.handlers.client import ClientServiceHandler
from neo.master.app import Application from neo.master.app import Application
from neo.master.handlers.client import ClientServiceHandler
class MasterClientHandlerTests(NeoUnitTestBase): class MasterClientHandlerTests(NeoUnitTestBase):
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
import unittest import unittest
from ..mock import Mock from ..mock import Mock
from collections import deque
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.handlers.master import MasterOperationHandler from neo.storage.handlers.master import MasterOperationHandler
...@@ -31,10 +30,6 @@ class StorageMasterHandlerTests(NeoUnitTestBase): ...@@ -31,10 +30,6 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# create an application object # create an application object
config = self.getStorageConfiguration(master_number=1) config = self.getStorageConfiguration(master_number=1)
self.app = Application(config) self.app = Application(config)
self.app.transaction_dict = {}
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
# handler # handler
self.operation = MasterOperationHandler(self.app) self.operation = MasterOperationHandler(self.app)
# set pmn # set pmn
......
...@@ -164,7 +164,7 @@ class NodeManagerTests(NeoUnitTestBase): ...@@ -164,7 +164,7 @@ class NodeManagerTests(NeoUnitTestBase):
NodeStates.UNKNOWN, None), NodeStates.UNKNOWN, None),
) )
# update manager content # update manager content
manager.update(Mock(), node_list) manager.update(Mock(), time(), node_list)
# - the client gets down # - the client gets down
self.checkClients([]) self.checkClients([])
# - master change it's address # - master change it's address
......
...@@ -39,6 +39,7 @@ from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError ...@@ -39,6 +39,7 @@ from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
from neo.master.handlers.client import ClientServiceHandler from neo.master.handlers.client import ClientServiceHandler
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.identification import IdentificationHandler
from neo.storage.handlers.initialization import InitializationHandler from neo.storage.handlers.initialization import InitializationHandler
class PCounter(Persistent): class PCounter(Persistent):
...@@ -1093,18 +1094,28 @@ class Test(NEOThreadedTest): ...@@ -1093,18 +1094,28 @@ class Test(NEOThreadedTest):
@with_cluster() @with_cluster()
def testRecycledClientUUID(self, cluster): def testRecycledClientUUID(self, cluster):
def notReady(orig, *args): l = threading.Semaphore(0)
m2s.discard(delayNotifyInformation) idle = []
return orig(*args) def requestIdentification(orig, *args):
if 1: orig(*args)
cluster.getTransaction() idle.append(cluster.storage.em.isIdle())
with cluster.master.filterConnection(cluster.storage) as m2s: l.release()
delayNotifyInformation = m2s.delayNotifyNodeInformation() cluster.db
cluster.client.master_conn.close() with cluster.master.filterConnection(cluster.storage) as m2s:
with cluster.newClient() as client, Patch( delayNotifyInformation = m2s.delayNotifyNodeInformation()
client.storage_bootstrap_handler, notReady=notReady): cluster.client.master_conn.close()
x = client.load(ZERO_TID) with cluster.newClient() as client:
self.assertNotIn(delayNotifyInformation, m2s) with Patch(IdentificationHandler,
requestIdentification=requestIdentification):
load = self.newThread(client.load, ZERO_TID)
l.acquire()
m2s.remove(delayNotifyInformation) # 2 packets pending
# Identification of the second client is retried
# after each processed notification:
l.acquire() # first client down
l.acquire() # new client up
load.join()
self.assertEqual(idle, [1, 1, 0])
@with_cluster(start_cluster=0, storage_count=3, autostart=3) @with_cluster(start_cluster=0, storage_count=3, autostart=3)
def testAutostart(self, cluster): def testAutostart(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment