Commit a4e7ac97 authored by Vincent Pelletier's avatar Vincent Pelletier

Split client handlers into 3 parts:

- generic base handler
- client connection handlers
- master connection handlers


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@765 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b6034e79
......@@ -30,9 +30,10 @@ from neo import protocol
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE, INVALID_SERIAL, \
DOWN_STATE, HIDDEN_STATE
from neo.client.handlers.handler import PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler, \
StorageBootstrapHandler, StorageAnswersHandler
from neo.client.handlers.master import PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler
from neo.client.handlers.storage import StorageBootstrapHandler, \
StorageAnswersHandler
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError, NEOStorageConnectionFailure
from neo.exception import NeoException
......
......@@ -15,25 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo.handler import EventHandler
from neo.connection import MTClientConnection
from neo import protocol
from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
BROKEN_STATE, FEEDING_STATE, DISCARDED_STATE, DOWN_STATE, \
HIDDEN_STATE
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import MTPartitionTable as PartitionTable
from neo.client.exception import NEOStorageError
from neo.exception import ElectionFailure
from neo.util import dump
from neo import decorators
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
......@@ -61,468 +43,3 @@ class BaseHandler(EventHandler):
else:
queue.put((conn, packet))
class PrimaryBaseHandler(BaseHandler):
def _closePrimaryMasterConnection(self, conn):
"""
This method is not part of EvenHandler API.
"""
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
app.master_conn.close()
app.master_conn = None
app.primary_master_node = None
class PrimaryBootstrapHandler(BaseHandler):
""" Bootstrap handler used when looking for the primary master """
def connectionCompleted(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection completed while not trying to connect')
super(PrimaryBootstrapHandler, self).connectionCompleted(conn)
def connectionFailed(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection failed while not trying to connect')
if app.trying_master_node is app.primary_master_node:
# Tried to connect to a primary master node and failed.
# So this would effectively mean that it is dead.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionFailed(conn)
def timeoutExpired(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
def connectionClosed(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionClosed(conn)
def peerBroken(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).peerBroken(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.trying_master_node = None
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# this must be a master node
if node_type != MASTER_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
if your_uuid != INVALID_UUID:
# got an uuid from the primary master
app.uuid = your_uuid
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
# Ask a primary master.
conn.lock()
try:
msg_id = conn.ask(protocol.askPrimaryMaster())
self.dispatcher.register(conn, msg_id, app.local_var.queue)
finally:
conn.unlock()
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('Unknown primary master UUID: %s. ' \
'Ignoring.' % dump(primary_uuid))
else:
app.primary_master_node = primary_node
if app.trying_master_node is not primary_node:
app.trying_master_node = None
conn.close()
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
pass
def handleAnswerNodeInformation(self, conn, packet, node_list):
pass
class PrimaryNotificationsHandler(PrimaryBaseHandler):
""" Handler that process the notifications from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
self._closePrimaryMasterConnection(conn)
BaseHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
BaseHandler.peerBroken(self, conn)
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
app = self.app
app._cache_lock_acquire()
try:
# ZODB required a dict with oid as key, so create it
oids = {}
for oid in oid_list:
oids[oid] = tid
try:
del app.mq_cache[oid]
except KeyError:
pass
db = app.getDB()
if db is not None:
db.invalidate(tid, oids)
finally:
app._cache_lock_release()
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
return
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
@decorators.identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
# This handler is in PrimaryBootstrapHandler, since this
# basicaly is an answer to askPrimaryMaster.
# Extract from P-NEO-Protocol.Description:
# Connection to primary master node (PMN in service state)
# CN -> PMN : askPrimaryMaster
# PMN -> CN : answerPrimaryMaster containing primary uuid and no
# known master list
# PMN -> CN : notifyNodeInformation containing list of all
# ASK_STORE_TRANSACTION# PMN -> CN : sendPartitionTable containing partition table id and
# list of rows
# notifyNodeInformation is valid as asynchrounous event, but
# sendPartitionTable is only triggered after askPrimaryMaster.
uuid = conn.getUUID()
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
nm = app.nm
for node_type, ip_address, port, uuid, state in node_list:
logging.info("notified of %s %s %d %s %s" %(node_type, ip_address, port, dump(uuid), state))
# Register new nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
# close connection to this node if no longer running
if node_type in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE) and \
state != RUNNING_STATE:
for conn in self.app.em.getConnectionList():
if conn.getUUID() == n.getUUID():
conn.close()
break
if node_type == STORAGE_NODE_TYPE:
# Remove from pool connection
app.cp.removeConnection(n)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None))
class PrimaryAnswersHandler(PrimaryBaseHandler):
""" Handle that process expected packets from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
self._closePrimaryMasterConnection(conn)
super(PrimaryAnswersHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
super(PrimaryAnswersHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
super(PrimaryAnswersHandler, self).peerBroken(conn)
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.setTID(tid)
def handleAnswerNewOIDs(self, conn, packet, oid_list):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
def handleNotifyTransactionFinished(self, conn, packet, tid):
app = self.app
if tid == app.getTID():
app.setTransactionFinished()
class StorageBaseHandler(BaseHandler):
def _dealWithStorageFailure(self, conn, node):
app = self.app
# Remove from pool connection
app.cp.removeConnection(node)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None))
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).peerBroken(conn)
class StorageBootstrapHandler(StorageBaseHandler):
""" Handler used when connecting to a storage node """
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBootstrapHandler, self).connectionFailed(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type != STORAGE_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
class StorageAnswersHandler(StorageBaseHandler):
""" Handle all messages related to ZODB operations """
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial,
compression, checksum, data):
app = self.app
app.local_var.asked_object = (oid, start_serial, end_serial, compression,
checksum, data)
def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
app = self.app
if conflicting:
app.local_var.object_stored = -1, serial
else:
app.local_var.object_stored = oid, serial
def handleAnswerStoreTransaction(self, conn, packet, tid):
app = self.app
app.setTransactionVoted()
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
app = self.app
# transaction information are returned as a dict
info = {}
info['time'] = TimeStamp(tid).timeTime()
info['user_name'] = user
info['description'] = desc
info['id'] = tid
info['oids'] = oid_list
app.local_var.txn_info = info
def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
app = self.app
# history_list is a list of tuple (serial, size)
app.local_var.history = oid, history_list
def handleOidNotFound(self, conn, packet, message):
app = self.app
# This can happen either when :
# - loading an object
# - asking for history
app.local_var.asked_object = -1
app.local_var.history = -1
def handleTidNotFound(self, conn, packet, message):
app = self.app
# This can happen when requiring txn informations
app.local_var.txn_info = -1
def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app
app.local_var.node_tids[conn.getUUID()] = tid_list
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo.client.handlers.handler import BaseHandler
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE
from neo.node import MasterNode, StorageNode
from neo.pt import MTPartitionTable as PartitionTable
from neo.util import dump
from neo import decorators
class PrimaryBaseHandler(BaseHandler):
def _closePrimaryMasterConnection(self, conn):
"""
This method is not part of EvenHandler API.
"""
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
app.master_conn.close()
app.master_conn = None
app.primary_master_node = None
class PrimaryBootstrapHandler(BaseHandler):
""" Bootstrap handler used when looking for the primary master """
def connectionCompleted(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection completed while not trying to connect')
super(PrimaryBootstrapHandler, self).connectionCompleted(conn)
def connectionFailed(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection failed while not trying to connect')
if app.trying_master_node is app.primary_master_node:
# Tried to connect to a primary master node and failed.
# So this would effectively mean that it is dead.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionFailed(conn)
def timeoutExpired(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
def connectionClosed(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionClosed(conn)
def peerBroken(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).peerBroken(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.trying_master_node = None
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# this must be a master node
if node_type != MASTER_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
if your_uuid != INVALID_UUID:
# got an uuid from the primary master
app.uuid = your_uuid
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
# Ask a primary master.
conn.lock()
try:
msg_id = conn.ask(protocol.askPrimaryMaster())
self.dispatcher.register(conn, msg_id, app.local_var.queue)
finally:
conn.unlock()
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('Unknown primary master UUID: %s. ' \
'Ignoring.' % dump(primary_uuid))
else:
app.primary_master_node = primary_node
if app.trying_master_node is not primary_node:
app.trying_master_node = None
conn.close()
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
pass
def handleAnswerNodeInformation(self, conn, packet, node_list):
pass
class PrimaryNotificationsHandler(PrimaryBaseHandler):
""" Handler that process the notifications from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
self._closePrimaryMasterConnection(conn)
BaseHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
BaseHandler.peerBroken(self, conn)
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
app = self.app
app._cache_lock_acquire()
try:
# ZODB required a dict with oid as key, so create it
oids = {}
for oid in oid_list:
oids[oid] = tid
try:
del app.mq_cache[oid]
except KeyError:
pass
db = app.getDB()
if db is not None:
db.invalidate(tid, oids)
finally:
app._cache_lock_release()
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
return
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
@decorators.identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
# This handler is in PrimaryBootstrapHandler, since this
# basicaly is an answer to askPrimaryMaster.
# Extract from P-NEO-Protocol.Description:
# Connection to primary master node (PMN in service state)
# CN -> PMN : askPrimaryMaster
# PMN -> CN : answerPrimaryMaster containing primary uuid and no
# known master list
# PMN -> CN : notifyNodeInformation containing list of all
# ASK_STORE_TRANSACTION# PMN -> CN : sendPartitionTable containing partition table id and
# list of rows
# notifyNodeInformation is valid as asynchrounous event, but
# sendPartitionTable is only triggered after askPrimaryMaster.
uuid = conn.getUUID()
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
nm = app.nm
for node_type, ip_address, port, uuid, state in node_list:
logging.info("notified of %s %s %d %s %s" %(node_type, ip_address, port, dump(uuid), state))
# Register new nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
# close connection to this node if no longer running
if node_type in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE) and \
state != RUNNING_STATE:
for conn in self.app.em.getConnectionList():
if conn.getUUID() == n.getUUID():
conn.close()
break
if node_type == STORAGE_NODE_TYPE:
# Remove from pool connection
app.cp.removeConnection(n)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None))
class PrimaryAnswersHandler(PrimaryBaseHandler):
""" Handle that process expected packets from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
self._closePrimaryMasterConnection(conn)
super(PrimaryAnswersHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
super(PrimaryAnswersHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
super(PrimaryAnswersHandler, self).peerBroken(conn)
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.setTID(tid)
def handleAnswerNewOIDs(self, conn, packet, oid_list):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
def handleNotifyTransactionFinished(self, conn, packet, tid):
app = self.app
if tid == app.getTID():
app.setTransactionFinished()
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo.client.handlers.handler import BaseHandler
from neo.protocol import STORAGE_NODE_TYPE
from ZODB.TimeStamp import TimeStamp
class StorageBaseHandler(BaseHandler):
def _dealWithStorageFailure(self, conn, node):
app = self.app
# Remove from pool connection
app.cp.removeConnection(node)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None))
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBaseHandler, self).peerBroken(conn)
class StorageBootstrapHandler(StorageBaseHandler):
""" Handler used when connecting to a storage node """
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageBootstrapHandler, self).connectionFailed(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type != STORAGE_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
class StorageAnswersHandler(StorageBaseHandler):
""" Handle all messages related to ZODB operations """
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial,
compression, checksum, data):
app = self.app
app.local_var.asked_object = (oid, start_serial, end_serial, compression,
checksum, data)
def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
app = self.app
if conflicting:
app.local_var.object_stored = -1, serial
else:
app.local_var.object_stored = oid, serial
def handleAnswerStoreTransaction(self, conn, packet, tid):
app = self.app
app.setTransactionVoted()
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
app = self.app
# transaction information are returned as a dict
info = {}
info['time'] = TimeStamp(tid).timeTime()
info['user_name'] = user
info['description'] = desc
info['id'] = tid
info['oids'] = oid_list
app.local_var.txn_info = info
def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
app = self.app
# history_list is a list of tuple (serial, size)
app.local_var.history = oid, history_list
def handleOidNotFound(self, conn, packet, message):
app = self.app
# This can happen either when :
# - loading an object
# - asking for history
app.local_var.asked_object = -1
app.local_var.history = -1
def handleTidNotFound(self, conn, packet, message):
app = self.app
# This can happen when requiring txn informations
app.local_var.txn_info = -1
def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app
app.local_var.node_tids[conn.getUUID()] = tid_list
......@@ -28,9 +28,11 @@ from neo.protocol import ERROR, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.exception import ElectionFailure
from neo.client.handlers.handler import BaseHandler, PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler, \
StorageBootstrapHandler, StorageAnswersHandler
from neo.client.handlers.handler import BaseHandler
from neo.client.handlers.master import PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler
from neo.client.handlers.storage import StorageBootstrapHandler, \
StorageAnswersHandler
from neo.node import StorageNode
from neo.util import dump
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment