Commit 34ff1ddb authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement a few handlers for storage node. Also, add more TODO items.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@40 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 55697ad7
...@@ -11,3 +11,8 @@ TODO ...@@ -11,3 +11,8 @@ TODO
- IdleEvent for a certain message type as well as a message ID - IdleEvent for a certain message type as well as a message ID
- Flushing write buffers only without reading packets - Flushing write buffers only without reading packets
- Garbage collection of unused nodes
- Stopping packet processing by returning a boolean value from
a handler, otherwise too tricky to exchange a handler with another
...@@ -118,7 +118,8 @@ class EventHandler(object): ...@@ -118,7 +118,8 @@ class EventHandler(object):
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -20,7 +20,6 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -20,7 +20,6 @@ class ElectionEventHandler(MasterEventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
addr = conn.getAddress()
# Request a node idenfitication. # Request a node idenfitication.
p = Packet() p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
......
...@@ -240,9 +240,7 @@ class Application(object): ...@@ -240,9 +240,7 @@ class Application(object):
# Yes, I have. # Yes, I have.
return return
self.trying_master_node = self.primary_master_node if self.trying_master_node is None and t + 1 < time():
if t + 1 < time():
# Choose a master node to connect to. # Choose a master node to connect to.
if self.primary_master_node is not None: if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it. # If I know a primary master node, pinpoint it.
...@@ -283,11 +281,16 @@ class Application(object): ...@@ -283,11 +281,16 @@ class Application(object):
handler = OperationEventHandler(self) handler = OperationEventHandler(self)
em = self.em em = self.em
nm = self.nm
# Make sure that every connection has the verfication event handler. # Make sure that every connection has the verfication event handler.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
conn.setHandler(handler) conn.setHandler(handler)
# Forget all client nodes.
for node in nm.getClientNodeList():
nm.remove(node)
while 1: while 1:
em.poll(1) em.poll(1)
......
import logging
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
from neo.protocol import Packet
class BootstrapEventHandler(StorageEventHandler):
"""This class deals with events for a bootstrap phase."""
def connectionCompleted(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection completed while not trying to connect')
p = Packet()
msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
StorageEventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection failed while not trying to connect')
if app.trying_master_node is app.primary_master_node:
# Tried to connect to a primary master node and failed.
# So this would effectively mean that it is dead.
app.primary_master_node = None
StorageEventHandler.connectionFailed(self, conn)
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
# I do not want to accept a connection at this phase, but
# someone might mistake me as a master node.
StorageEventHandler.connectionAccepted(self, conn, s, addr)
def timeoutExpired(self, conn):
if isinstance(conn, ClientConnection):
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
StorageEventHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn):
if isinstance(conn, ClientConnection):
if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
StorageEventHandler.connectionClosed(self, conn)
def peerBroken(self, conn):
if isinstance(conn, ClientConnection):
if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
app.primary_master_node = None
app.trying_master_node = None
StorageEventHandler.peerBroken(self, conn)
def handleNotReady(self, conn, packet, message):
if isinstance(conn, ClientConnection):
if app.trying_master_node is not None:
app.trying_master_node = None
conn.close()
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later'))
conn.abort()
return
if name != app.name:
logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
conn.addPacket(p)
# Now the master node should know that I am not the right one.
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
if not isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
app.nm.remove(node)
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
# Ask a primary master.
msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
if not isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
app.primary_master_node = primary_node
if app.trying_master_node is primary_node:
# I am connected to the right one.
pass
else:
app.trying_master_node = None
conn.close()
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
def handleAskLastIDs(self, conn, packet):
pass
def handleAskPartitionTable(self, conn, packet, offset_list):
pass
def handleSendPartitionTable(self, conn, packet, row_list):
pass
def handleNotifyPartitionChanges(self, conn, packet, cell_list):
pass
def handleStartOperation(self, conn, packet):
pass
def handleStopOperation(self, conn, packet):
pass
def handleAskUnfinishedTransactions(self, conn, packet):
pass
def handleAskOIDsByTID(self, conn, packet, tid):
pass
def handleAskObjectPresent(self, conn, packet, oid, tid):
pass
def handleDeleteTransaction(self, conn, packet, tid):
pass
def handleCommitTransaction(self, conn, packet, tid):
pass
def handleLockInformation(self, conn, packet, tid):
pass
def handleUnlockInformation(self, conn, packet, tid):
pass
import logging
from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
class StorageEventHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def __init__(self, app):
self.app = app
EventHandler.__init__(self)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
raise NotImplementedError('this method must be overridden')
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
raise NotImplementedError('this method must be overridden')
def handleAskPrimaryMaster(self, conn, packet):
"""This should not be used in reality, because I am not a master
node. But? If someone likes to ask me, I can help."""
app = self.app
if app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else:
primary_uuid = INVALID_UUID
known_master_list = []
for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE:
continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = Packet()
p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
raise NotImplementedError('this method must be overridden')
def handleAnnouncePrimaryMaster(self, conn, packet):
"""Theoretically speaking, I should not get this message,
because the primary master election must happen when I am
not connected to any master node."""
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is None:
raise RuntimeError('I do not know the uuid %r' % dump(uuid))
if not isinstance(node, MasterNode):
self.handleUnexpectedPacket(conn, packet)
return
if app.primary_master_node is None:
# Hmm... I am somehow connected to the primary master already.
app.primary_master_node = node
if not isinstance(conn, ClientConnection):
# I do not want a connection from any master node. I rather
# want to connect from myself.
conn.close()
elif app.primary_master_node.getUUID() == uuid:
# Yes, I know you are the primary master node.
pass
else:
# It seems that someone else claims taking over the primary
# master node...
app.primary_master_node = None
raise PrimaryFailure('another master node wants to take over')
def handleReelectPrimaryMaster(self, conn, packet):
raise PrimaryFailure('re-election occurs')
def handleNotifyNodeInformation(self, conn, packet, node_list):
"""Store information on nodes, only if this is sent by a primary
master node."""
# XXX it might be better to implement this callback in each handler.
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, MasterNode) \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
for node_type, ip_address, port, uuid, state in node_list:
addr = (ip_address, port)
if node_type == MASTER_NODE_TYPE:
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
n.setState(state)
if uuid != INVALID_UUID:
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = app.nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(server = addr, uuid = uuid)
app.nm.add(n)
else:
n.setServer(addr)
n.setState(state)
elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
if state == RUNNING_STATE:
n = app.nm.getNodeByUUID(uuid)
if n is None:
n = ClientNode(uuid = uuid)
app.nm.add(n)
else:
n = app.nm.getNodeByUUID(uuid)
if n is not None:
app.nm.remove(n)
def handleAskLastIDs(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleAskPartitionTable(self, conn, packet, offset_list):
raise NotImplementedError('this method must be overridden')
def handleSendPartitionTable(self, conn, packet, row_list):
raise NotImplementedError('this method must be overridden')
def handleNotifyPartitionChanges(self, conn, packet, cell_list):
raise NotImplementedError('this method must be overridden')
def handleStartOperation(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleStopOperation(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleAskUnfinishedTransactions(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleAskOIDsByTID(self, conn, packet, tid):
raise NotImplementedError('this method must be overridden')
def handleAskObjectPresent(self, conn, packet, oid, tid):
raise NotImplementedError('this method must be overridden')
def handleDeleteTransaction(self, conn, packet, tid):
raise NotImplementedError('this method must be overridden')
def handleCommitTransaction(self, conn, packet, tid):
raise NotImplementedError('this method must be overridden')
def handleLockInformation(self, conn, packet, tid):
raise NotImplementedError('this method must be overridden')
def handleUnlockInformation(self, conn, packet, tid):
raise NotImplementedError('this method must be overridden')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment