Commit 4c61df28 authored by Vincent Pelletier's avatar Vincent Pelletier

Big rework on client handlers:

- Split completely expected packet handlers from asynchronous event handlers
- Remove NEOStorageConnectionFailure exception and replace it by a more general "ConnectionClosed" exception, purely internal to client app.
- Modify waitMessage to redirect to a handler based on remote peer type
- Fix multiple code paths taken when master doesn't accept connections (or closes them) while trying to conenct to it.
Also:
- remove unused imports found by pyflakes


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@773 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent a0f2936c
......@@ -15,9 +15,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
import os
from thread import get_ident
from cPickle import dumps, loads
from cPickle import dumps
from zlib import compress, decompress
from Queue import Queue, Empty
from random import shuffle
......@@ -27,15 +26,15 @@ from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection
from neo import protocol
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE, INVALID_SERIAL, \
from neo.protocol import INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, CLIENT_NODE_TYPE, INVALID_SERIAL, \
DOWN_STATE, HIDDEN_STATE
from neo.client.handlers.master import PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler
from neo.client.handlers.storage import StorageBootstrapHandler, \
StorageAnswersHandler
StorageAnswersHandler, StorageEventHandler
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError, NEOStorageConnectionFailure
NEOStorageNotFoundError
from neo.exception import NeoException
from neo.util import makeChecksum, dump
from neo.connector import getConnectorHandler
......@@ -46,6 +45,8 @@ from neo.locking import RLock, Lock
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
class ConnectionClosed(Exception): pass
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
......@@ -72,8 +73,7 @@ class ConnectionPool(object):
while True:
logging.info('trying to connect to %s - %s', node, node.getState())
app.setNodeReady()
handler = StorageBootstrapHandler(app, app.dispatcher)
conn = MTClientConnection(app.em, handler, addr,
conn = MTClientConnection(app.em, app.storage_event_handler, addr,
connector_handler=app.connector_handler)
conn.lock()
try:
......@@ -90,14 +90,13 @@ class ConnectionPool(object):
conn.unlock()
try:
app._waitMessage(conn, msg_id, handler=handler)
except NEOStorageError:
app._waitMessage(conn, msg_id, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to storage node %s failed', node)
return None
if app.isNodeReady():
logging.info('connected to storage node %s', node)
conn.setHandler(self.app.storage_handler)
return conn
else:
# Connection failed, notify primary master node
......@@ -255,9 +254,11 @@ class Application(object):
self.mq_cache = MQ()
self.new_oid_list = []
self.ptid = INVALID_PTID
self.storage_handler = StorageAnswersHandler(self, self.dispatcher)
self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher)
self.primary_bootstrap_handler = PrimaryBootstrapHandler(self, self.dispatcher)
self.storage_event_handler = StorageEventHandler(self, self.dispatcher)
self.storage_bootstrap_handler = StorageBootstrapHandler(self)
self.storage_handler = StorageAnswersHandler(self)
self.primary_handler = PrimaryAnswersHandler(self)
self.primary_bootstrap_handler = PrimaryBootstrapHandler(self)
self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher)
# Internal attribute distinct between thread
self.local_var = ThreadContext()
......@@ -303,8 +304,6 @@ class Application(object):
def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
"""Wait for a message returned by the dispatcher in queues."""
local_queue = self.local_var.queue
if handler is None:
handler = self.notifications_handler
while 1:
if msg_id is None:
try:
......@@ -315,11 +314,26 @@ class Application(object):
conn, packet = local_queue.get()
# check fake packet
if packet is None:
self.notifyDeadNode(conn)
if conn.getUUID() == target_conn.getUUID():
raise NEOStorageConnectionFailure('connection closed')
raise ConnectionClosed
else:
continue
# Guess the handler to use based on the type of node on the
# connection
if handler is None:
node = self.nm.getNodeByServer(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?'
else:
node_type = node.getType()
if node_type == protocol.STORAGE_NODE_TYPE:
handler = self.storage_handler
elif node_type == protocol.MASTER_NODE_TYPE:
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (
node_type, )
handler.dispatch(conn, packet)
if target_conn is conn and msg_id == packet.getId() \
and packet.getType() & 0x8000:
......@@ -398,11 +412,19 @@ class Application(object):
# Query for primary master node
conn.lock()
try:
if conn.getConnector() is None:
# This happens, if a connection could not be established.
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
msg_id = conn.ask(protocol.askPrimaryMaster())
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
try:
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
except ConnectionClosed:
continue
# If we reached the primary master node, mark as connected
connected = self.primary_master_node is not None \
and self.primary_master_node is self.trying_master_node
......@@ -412,13 +434,22 @@ class Application(object):
while conn.getUUID() is None:
conn.lock()
try:
if conn.getConnector() is None:
logging.error('Connection to master node %s lost',
self.trying_master_node)
self.primary_master_node = None
break
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
try:
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
except ConnectionClosed:
self.primary_master_node = None
break
if conn.getUUID() is None:
# Node identification was refused by master.
# Sleep a bit an retry.
......@@ -513,7 +544,7 @@ class Application(object):
try:
self._askStorage(conn, protocol.askObject(oid, serial, tid))
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
if self.local_var.asked_object == -1:
......@@ -649,7 +680,7 @@ class Application(object):
checksum, compressed_data, self.local_var.tid)
try:
self._askStorage(conn, p)
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
# Check we don't get any conflict
......@@ -698,7 +729,7 @@ class Application(object):
user, desc, ext, oid_list)
try:
self._askStorage(conn, p)
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
if not self.isTransactionVoted():
......@@ -787,7 +818,7 @@ class Application(object):
self.local_var.txn_info = 0
try:
self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
if self.local_var.txn_info == -1:
......@@ -863,7 +894,7 @@ class Application(object):
while len(self.local_var.node_tids) != len(storage_node_list):
try:
self._waitMessage(handler=self.storage_handler)
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
# Reorder tids
......@@ -887,7 +918,7 @@ class Application(object):
self.local_var.txn_info = 0
try:
self._askStorage(conn, protocol.askTransactionInformation(tid))
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
if isinstance(self.local_var.txn_info, dict):
break
......@@ -923,7 +954,7 @@ class Application(object):
self.local_var.history = None
try:
self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
if self.local_var.history == -1:
......@@ -956,7 +987,7 @@ class Application(object):
self.local_var.txn_info = None
try:
self._askStorage(conn, protocol.askTransactionInformation(serial))
except NEOStorageConnectionFailure:
except ConnectionClosed:
continue
if self.local_var.txn_info == -1:
......
......@@ -19,4 +19,3 @@ from ZODB import POSException
class NEOStorageError(POSException.StorageError): pass
class NEOStorageConflictError(NEOStorageError): pass
class NEOStorageNotFoundError(NEOStorageError): pass
class NEOStorageConnectionFailure(NEOStorageError): pass
......@@ -43,3 +43,52 @@ class BaseHandler(EventHandler):
else:
queue.put((conn, packet))
def _notifyQueues(self, conn):
"""
Put fake packets to task queues so that threads waiting for an
answer get notified of the disconnection.
"""
queue_set = set()
conn_id = id(conn)
for key in self.dispatcher.message_table.keys():
if conn_id == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
for queue in queue_set:
queue.put((conn, None))
def connectionClosed(self, conn):
super(BaseHandler, self).connectionClosed(conn)
self._notifyQueues(conn)
def timeoutExpired(self, conn):
super(BaseHandler, self).timeoutExpired(conn)
conn.lock()
try:
conn.close()
finally:
conn.release()
self._notifyQueues(conn)
def connectionFailed(self, conn):
super(BaseHandler, self).connectionFailed(conn)
self._notifyQueues(conn)
def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler')
class AnswerBaseHandler(EventHandler):
def __init__(self, app):
self.app = app
super(AnswerBaseHandler, self).__init__()
connectionStarted = unexpectedInAnswerHandler
connectionCompleted = unexpectedInAnswerHandler
connectionFailed = unexpectedInAnswerHandler
connectionAccepted = unexpectedInAnswerHandler
timeoutExpired = unexpectedInAnswerHandler
connectionClosed = unexpectedInAnswerHandler
packetReceived = unexpectedInAnswerHandler
peerBroken = unexpectedInAnswerHandler
......@@ -17,8 +17,7 @@
import logging
from neo.client.handlers.handler import BaseHandler
from neo import protocol
from neo.client.handlers.handler import BaseHandler, AnswerBaseHandler
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE
from neo.node import MasterNode, StorageNode
......@@ -26,72 +25,9 @@ from neo.pt import MTPartitionTable as PartitionTable
from neo.util import dump
from neo import decorators
class PrimaryBaseHandler(BaseHandler):
def _closePrimaryMasterConnection(self, conn):
"""
This method is not part of EvenHandler API.
"""
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
app.master_conn.lock()
try:
app.master_conn.close()
finally:
app.master_conn.release()
app.master_conn = None
app.primary_master_node = None
class PrimaryBootstrapHandler(BaseHandler):
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def connectionCompleted(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection completed while not trying to connect')
super(PrimaryBootstrapHandler, self).connectionCompleted(conn)
def connectionFailed(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection failed while not trying to connect')
if app.trying_master_node is app.primary_master_node:
# Tried to connect to a primary master node and failed.
# So this would effectively mean that it is dead.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionFailed(conn)
def timeoutExpired(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
def connectionClosed(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionClosed(conn)
def peerBroken(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).peerBroken(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.trying_master_node = None
......@@ -185,21 +121,35 @@ class PrimaryBootstrapHandler(BaseHandler):
def handleAnswerNodeInformation(self, conn, packet, node_list):
pass
class PrimaryNotificationsHandler(PrimaryBaseHandler):
class PrimaryNotificationsHandler(BaseHandler):
""" Handler that process the notifications from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
self._closePrimaryMasterConnection(conn)
BaseHandler.connectionClosed(self, conn)
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
logging.critical("connection to primary master node closed")
conn.lock()
try:
app.master_conn.close()
finally:
conn.release()
app.master_conn = None
app.primary_master_node = None
super(PrimaryNotificationsHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
logging.critical("connection timeout to primary master node expired")
BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
logging.critical("primary master node is broken")
BaseHandler.peerBroken(self, conn)
def handleStopOperation(self, conn, packet):
......@@ -345,23 +295,9 @@ class PrimaryNotificationsHandler(PrimaryBaseHandler):
for queue in queue_set:
queue.put((conn, None))
class PrimaryAnswersHandler(PrimaryBaseHandler):
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
self._closePrimaryMasterConnection(conn)
super(PrimaryAnswersHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
super(PrimaryAnswersHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
super(PrimaryAnswersHandler, self).peerBroken(conn)
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.setTID(tid)
......
......@@ -17,11 +17,11 @@
import logging
from neo.client.handlers.handler import BaseHandler
from neo.client.handlers.handler import BaseHandler, AnswerBaseHandler
from neo.protocol import STORAGE_NODE_TYPE
from ZODB.TimeStamp import TimeStamp
class StorageBaseHandler(BaseHandler):
class StorageEventHandler(BaseHandler):
def _dealWithStorageFailure(self, conn, node):
app = self.app
......@@ -45,26 +45,26 @@ class StorageBaseHandler(BaseHandler):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).connectionClosed(conn)
super(StorageEventHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).timeoutExpired(conn)
super(StorageEventHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).peerBroken(conn)
class StorageBootstrapHandler(StorageBaseHandler):
""" Handler used when connecting to a storage node """
super(StorageEventHandler, self).peerBroken(conn)
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBootstrapHandler, self).connectionFailed(conn)
super(StorageEventHandler, self).connectionFailed(conn)
class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """
def handleNotReady(self, conn, packet, message):
app = self.app
......@@ -76,11 +76,7 @@ class StorageBootstrapHandler(StorageBaseHandler):
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type != STORAGE_NODE_TYPE:
conn.lock()
try:
conn.close()
finally:
conn.release()
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
......@@ -88,17 +84,13 @@ class StorageBootstrapHandler(StorageBaseHandler):
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
conn.lock()
try:
conn.close()
finally:
conn.release()
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
class StorageAnswersHandler(StorageBaseHandler):
class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment