Commit 5155ff33 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement a recovery step. Not tested well.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@27 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fd22e3e6
...@@ -44,6 +44,9 @@ class BaseConnection(object): ...@@ -44,6 +44,9 @@ class BaseConnection(object):
def getEventManager(self): def getEventManager(self):
return self.em return self.em
def getUUID(self):
return None
class ListeningConnection(BaseConnection): class ListeningConnection(BaseConnection):
"""A listen connection.""" """A listen connection."""
def __init__(self, event_manager, handler, addr = None, **kw): def __init__(self, event_manager, handler, addr = None, **kw):
......
...@@ -6,8 +6,8 @@ from neo.connection import ServerConnection ...@@ -6,8 +6,8 @@ from neo.connection import ServerConnection
from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \ PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \ REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
STOP_OPERATION, ASK_FINISHING_TRANSACTIONS, ANSWER_FINISHING_TRANSACTIONS, \ STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
FINISH_TRANSACTIONS, \ ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \ NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \ PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE INTERNAL_ERROR_CODE
...@@ -125,6 +125,30 @@ class EventHandler(object): ...@@ -125,6 +125,30 @@ class EventHandler(object):
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskLastIDs(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
self.handleUnexpectedPacket(conn, packet)
def handleAskPartitionTable(self, conn, packet, offset_list):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPartitionTable(self, conn, packet, row_list):
self.handleUnexpectedPacket(conn, packet)
def handleSendPartitionTable(self, conn, packet, row_list):
self.handleUnexpectedPacket(conn, packet)
def handleNotifyPartitionChanges(self, conn, packet, cell_list):
self.handleUnexpectedPacket(conn, packet)
def handleStartOperation(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleStopOperation(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
# Error packet handlers. # Error packet handlers.
handleNotReady = handleUnexpectedPacket handleNotReady = handleUnexpectedPacket
...@@ -158,6 +182,14 @@ class EventHandler(object): ...@@ -158,6 +182,14 @@ class EventHandler(object):
d[ANNOUNCE_PRIMARY_MASTER] = self.handleAnnouncePrimaryMaster d[ANNOUNCE_PRIMARY_MASTER] = self.handleAnnouncePrimaryMaster
d[REELECT_PRIMARY_MASTER] = self.handleReelectPrimaryMaster d[REELECT_PRIMARY_MASTER] = self.handleReelectPrimaryMaster
d[NOTIFY_NODE_INFORMATION] = self.handleNotifyNodeInformation d[NOTIFY_NODE_INFORMATION] = self.handleNotifyNodeInformation
d[ASK_LAST_IDS] = self.handleAskLastIDs
d[ANSWER_LAST_IDS] = self.handleAnswerLastIDs
d[ASK_PARTITION_TABLE] = self.handleAskPartitionTable
d[ANSWER_PARTITION_TABLE] = self.handleAnswerPartitionTable
d[SEND_PARTITION_TABLE] = self.handleSendPartitionTable
d[NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges
d[START_OPERATION] = self.handleStartOperation
d[STOP_OPERATION] = self.handleStopOperation
self.packet_dispatch_table = d self.packet_dispatch_table = d
......
...@@ -14,6 +14,8 @@ from neo.util import dump ...@@ -14,6 +14,8 @@ from neo.util import dump
from neo.connection import ListeningConnection, ClientConnection, ServerConnection from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure from neo.exception import ElectionFailure, PrimaryFailure
from neo.master.election import ElectionEventHandler from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler
from neo.pt import PartitionTable
class Application(object): class Application(object):
"""The master node application.""" """The master node application."""
...@@ -37,6 +39,7 @@ class Application(object): ...@@ -37,6 +39,7 @@ class Application(object):
# Internal attributes. # Internal attributes.
self.em = EventManager() self.em = EventManager()
self.nm = NodeManager() self.nm = NodeManager()
self.pt = PartitionTable(self.num_partitions, self.num_replicas)
self.primary = None self.primary = None
self.primary_master_node = None self.primary_master_node = None
...@@ -55,6 +58,8 @@ class Application(object): ...@@ -55,6 +58,8 @@ class Application(object):
self.ltid = INVALID_TID self.ltid = INVALID_TID
# The last Partition Table ID. # The last Partition Table ID.
self.lptid = INVALID_PTID self.lptid = INVALID_PTID
# The target node's uuid to request next.
self.target_uuid = None
def run(self): def run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
...@@ -78,9 +83,7 @@ class Application(object): ...@@ -78,9 +83,7 @@ class Application(object):
while 1: while 1:
try: try:
if self.primary: if self.primary:
while 1: self.playPrimaryRole()
self.startRecovery()
self.playPrimaryRole()
else: else:
self.playSecondaryRole() self.playSecondaryRole()
raise RuntimeError, 'should not reach here' raise RuntimeError, 'should not reach here'
...@@ -246,7 +249,9 @@ class Application(object): ...@@ -246,7 +249,9 @@ class Application(object):
conn.close() conn.close()
bootstrap = False bootstrap = False
def broadcastNodeStateChange(self, node): def broadcastNodeInformation(self, node):
"""Broadcast a Notify Node Information packet."""
node_type = node.getNodeType()
state = node.getState() state = node.getState()
uuid = node.getUUID() uuid = node.getUUID()
ip_address, port = node.getServer() ip_address, port = node.getServer()
...@@ -255,194 +260,136 @@ class Application(object): ...@@ -255,194 +260,136 @@ class Application(object):
if port is None: if port is None:
port = 0 port = 0
if isinstance(node, ClientNode): if node_type == CLIENT_NODE_TYPE:
# Notify secondary master nodes and storage nodes of # Only to master nodes and storage nodes.
# the removal of the client node.
for c in em.getConnectionList(): for c in em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
n = nm.getNodeByUUID(uuid) n = nm.getNodeByUUID(c.getUUID())
if isinstance(n, (MasterNode, StorageNode)): if isinstance(n, (MasterNode, StorageNode)):
p = Packet() p = Packet()
p.notifyNodeStateChange(c.getNextId(), node_list = (node_type, ip_address, port, uuid, state)
CLIENT_NODE_TYPE, p.notifyNodeStateChange(c.getNextId(), node_list)
ip_address, port,
uuid, state)
c.addPacket(p) c.addPacket(p)
elif isinstance(node, MasterNode): elif isinstance(node, (MasterNode, StorageNode)):
for c in em.getConnectionList():
if c.getUUID() is not None:
p = Packet()
p.notifyNodeStateChange(c.getNextId(),
MASTER_NODE_TYPE,
ip_address, port,
uuid, state)
c.addPacket(p)
elif isinstance(node, StorageNode):
for c in em.getConnectionList(): for c in em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
p = Packet() p = Packet()
p.notifyNodeStateChange(c.getNextId(), node_list = (node_type, ip_address, port, uuid, state)
STORAGE_NODE_TYPE, p.notifyNodeStateChange(c.getNextId(), node_list)
ip_address, port,
uuid, state)
c.addPacket(p) c.addPacket(p)
else: else:
raise Runtime, 'unknown node type' raise Runtime, 'unknown node type'
def playPrimaryRoleServerIterator(self): def recoverStatus(self):
"""Handle events for a server connection.""" logging.info('begin the recovery of the status')
handler = RecoveryEventHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
while 1:
method, conn = self.event[0], self.event[1]
logging.debug('method is %r, conn is %s:%d',
method, conn.ip_address, conn.port)
if method is self.CONNECTION_ACCEPTED:
pass
elif method in (self.CONNECTION_CLOSED, self.TIMEOUT_EXPIRED):
uuid = conn.getUUID()
if uuid is not None:
# If the peer is identified, mark it as temporarily down or down.
node = nm.getNodeByUUID(uuid)
if isinstance(node, ClientNode):
node.setState(DOWN_STATE)
self.broadcastNodeStateChange(node)
# For now, down client nodes simply get forgotten.
nm.remove(node)
elif isinstance(node, MasterNode):
if node.getState() not in (BROKEN_STATE, DOWN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
self.broadcastNodeStateChange(node)
elif isinstance(node, StorageNode):
if node.getState() not in (BROKEN_STATE, DOWN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
self.broadcastNodeStateChange(node)
# FIXME check the partition table.
self.pt.setTemporarilyDown(node.getUUID())
if self.ready and self.pt.fatal():
logging.critical('the storage nodes are not enough')
self.ready = False
self.broadcast
# FIXME update the database.
else:
raise RuntimeError, 'unknown node type'
return
elif method is self.PEER_BROKEN:
uuid = conn.getUUID()
if uuid is not None:
# If the peer is identified, mark it as broken.
node = nm.getNodeByUUID(uuid)
node.setState(BROKEN_STATE)
return
elif method is self.PACKET_RECEIVED:
if node is not None and node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
packet = self.event[2]
t = packet.getType()
try:
if t == ERROR:
code, msg = packet.decode()
if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE,
BROKEN_NODE_DISALLOWED_CODE):
# In those cases, it is better to assume that I am unusable.
logging.critical(msg)
raise RuntimeError, msg
else:
# Otherwise, the peer has an error.
logging.error('an error happened at the peer %s:%d',
conn.ip_address, conn.port)
if node is not None:
node.setState(BROKEN_STATE)
conn.close()
return
elif t == PING:
logging.info('got a keep-alive message from %s:%d; overloaded?',
conn.ip_address, conn.port)
conn.addPacket(Packet().pong(packet.getId()))
elif t == PONG:
pass
elif t == REQUEST_NODE_IDENTIFICATION:
node_type, uuid, ip_address, port, name = packet.decode()
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(),
'retry later'))
conn.abort()
continue
if name != self.name:
logging.info('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
continue
node = self.nm.getNodeByServer(ip_address, port)
if node is None:
node = MasterNode(ip_address, port, uuid)
self.nm.add(node)
self.unconnected_master_node_set.add((ip_address, port))
else:
# Trust the UUID sent by the peer.
if node.getUUID() != uuid:
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
self.uuid, self.ip_address, self.port)
conn.addPacket(p)
conn.expectMessage()
elif t == ASK_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
ltid, loid = packet.decode() # Make sure that every connection has the status recovery event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
p = Packet() prev_lptid = None
if self.primary: self.loid = INVALID_OID
uuid = self.uuid self.ltid = INVALID_TID
elif self.primary_master_node is not None: self.lptid = None
uuid = self.primary_master_node.getUUID() while 1:
else: self.target_uuid = None
uuid = INVALID_UUID self.pt.clear()
known_master_list = [] if self.lptid is not None:
for n in self.nm.getMasterNodeList(): # I need to retrieve last ids again.
info = n.getServer() + (n.getUUID() or INVALID_UUID,) logging.debug('resending Ask Last IDs')
known_master_list.append(info) for conn in em.getConnectionList():
p.answerPrimaryMaster(packet.getId(), self.ltid, self.loid, uuid = conn.getUUID()
uuid, known_master_list) if uuid is not None:
conn.addPacket(p) node = nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode) \
if self.primary and (self.ltid < ltid or self.loid < loid): and node.getState() == RUNNING_STATE:
# I am not really primary... So restart the election. p = Packet()
raise ElectionFailure, 'not a primary master any longer' msg_id = conn.getNextId()
elif t == ANNOUNCE_PRIMARY_MASTER: p.askLastIDs(msg_id)
if node is None: conn.addPacket(p)
raise ProtocolError(packet, 'not identified') conn.expectMessage(msg_id)
if self.primary: # Wait for at least one storage node to appear.
# I am also the primary... So restart the election. while self.target_uuid is None:
raise ElectionFailure, 'another primary arises' em.poll(1)
self.primary = False # Wait a bit.
self.primary_master_node = node t = time()
logging.info('%s:%d is the primary' % node.getServer()) while time() < t + 5:
elif t == REELECT_PRIMARY_MASTER: em.poll(1)
raise ElectionFailure, 'reelection requested'
else: # Now I have at least one to ask.
raise ProtocolError(packet, 'unexpected packet 0x%x' % t) prev_lptid = self.lptid
except ProtocolError, m: node = nm.getNodeByUUID(uuid)
logging.debug('protocol problem %s', m[1]) if node.getState() != RUNNING_STATE:
conn.addPacket(Packet().protocolError(m[0].getId(), m[1])) # Weird. It's dead.
conn.abort() logging.info('the target storage node is dead')
continue
for conn in em.getConnectionList():
if conn.getUUID() == self.lptid:
break
else: else:
raise RuntimeError, 'unexpected event %r' % (method,) # Why?
logging.info('no connection to the target storage node')
continue
if self.lptid == INVALID_PTID:
# This looks like the first time. So make a fresh table.
logging.debug('creating a new partition table')
self.pt.make(nm.getStorageNodeList())
else:
# Obtain a partition table. It is necessary to split this message
# because the packet size can be huge.
logging.debug('asking a partition table to %s:%d', *(node.getServer()))
start = 0
size = self.num_partitions
while size:
len = min(1000, size)
msg_id = conn.getNextId()
p = Packet()
p.askPartitionTable(msg_id, range(start, start + len))
conn.addPacket(p)
conn.expectMessage(msg_id)
size -= len
start += len
t = time()
while 1:
em.poll(1)
if node.getState() != RUNNING_STATE:
# Dead.
break
if self.pt.filled() or t + 30 < time():
break
if self.lptid != prev_lptid or not self.pt.filled():
# I got something newer or the target is dead.
continue
# Wait until the cluster gets operational or the Partition Table ID
# turns out to be not the latest.
logging.debug('waiting for the cluster to be operational')
while 1:
em.poll(1)
if self.pt.operational():
break
if self.lptid != prev_lptid:
break
if self.lptid != prev_lptid:
# I got something newer.
continue
break
def playPrimaryRole(self): def playPrimaryRole(self):
logging.info('play the primary role') logging.info('play the primary role')
self.ready = False self.recoverStatus()
raise NotImplementedError raise NotImplementedError
def playSecondaryRole(self): def playSecondaryRole(self):
......
...@@ -38,6 +38,8 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -38,6 +38,8 @@ class ElectionEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
app.unconnected_master_node_set.add(addr) app.unconnected_master_node_set.add(addr)
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
elif node.getState() == TEMPORARILY_DOWN_STATE:
app.unconnected_master_node_set.add(addr)
MasterEventHandler.connectionFailed(self, conn) MasterEventHandler.connectionFailed(self, conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
...@@ -243,3 +245,36 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -243,3 +245,36 @@ class ElectionEventHandler(MasterEventHandler):
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type != MASTER_NODE:
# No interest.
continue
# Register new master nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
import logging
from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE
from neo.master.handler import MasterEventHandler
from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID
from neo.util import dump
class RecoveryEventHandler(MasterEventHandler):
"""This class deals with events for a recovery phase."""
def connectionClosed(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() in (TEMPORARILY_DOWN_STATE, DOWN_STATE):
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
logging.info('reject a connection from a client')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later'))
conn.abort()
return
if name != app.name:
logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
else:
node = StorageNode(server = address, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getState() == BROKEN_STATE:
if node.getUUID() == uuid:
p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p)
conn.abort()
return
else:
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
# Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because
# I must send all node information immediately.
p = Packet()
p.answerPrimaryMaster(packet.getId(), app.uuid, [])
conn.addPacket(p)
# Send the information.
node_list = []
for n in app.nm.getNodeList():
ip_address, port = n.getServer()
node_list.append((n.getNodeType(), ip_address, port,
n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big.
p = Packet()
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p)
del node_list[:]
p = Packet()
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p)
# If this is a storage node, ask the last IDs.
node = app.nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
p = Packet()
msg_id = conn.getNextId()
p.askLastIDs(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type != CLIENT_NODE:
# No interest.
continue
if uuid == INVALID_UUID:
# No interest.
continue
if app.uuid == uuid:
# This looks like me...
if state == RUNNING_STATE:
# Yes, I know it.
continue
else:
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
if node is None:
# I really don't know such a node. What is this?
continue
else:
if node.getServer() != addr:
# This is different from what I know.
continue
if node.getState() == state:
# No change. Don't care.
continue
if state == RUNNING_STATE:
# No problem.
continue
# Something wrong happened possibly. Cut the connection to this node,
# if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
node.setState(state)
app.broadcastNodeInformation(node)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, StorageNode):
self.handleUnexpectedPacket(conn, packet)
return
# If the target is still unknown, set it to this node for now.
if app.target_uuid is None:
app.target_uuid = uuid
# Get max values.
if app.loid < loid:
app.loid = loid
if app.ltid < ltid:
app.ltid = ltid
if app.lptid is None or app.lptid < lptid:
app.lptid = lptid
# I need to use the node which has the max Partition Table ID.
app.target_uuid = uuid
elif app.lptid == lptid and app.target_uuid is None:
app.target_uuid = uuid
def handleAnswerPartitionTable(self, conn, packet, cell_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, StorageNode):
self.handleUnexpectedPacket(conn, packet)
return
if uuid != app.target_uuid:
# If this is not from a target node, ignore it.
return
for offset, cell_list in row_list:
if offset >= app.num_partitions or app.pt.hasOffset(offset):
# There must be something wrong.
self.handleUnexpectedPacket(conn, packet)
return
for uuid, state in cell_list:
n = app.nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(uuid = uuid)
n.setState(TEMPORARILY_DOWN_STATE)
app.nm.add(n)
app.pt.setCell(offset, n, state)
...@@ -496,4 +496,4 @@ class Packet(object): ...@@ -496,4 +496,4 @@ class Packet(object):
def _decodeStopOperation(self): def _decodeStopOperation(self):
pass pass
decode_table[START_OPERATION] = _decodeStopOperation decode_table[STOP_OPERATION] = _decodeStopOperation
import logging
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE
class Cell(object):
"""This class represents a cell in a partition table."""
def __init__(self, node, state = UP_TO_DATE_STATE):
self.node = node
self.state = state
def getState(self):
return self.state
def setState(self, state):
self.state = state
def getNode(self):
return self.node
def getNodeState(self):
"""This is a short hand."""
return self.node.getState()
class PartitionTable(object):
"""This class manages a partition table."""
def __init__(self, num_partitions, num_replicas):
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
self.partition_list = [None] * num_partitions
def clear(self):
"""Forget an existing partition table."""
self.num_filled_rows = 0
self.partition_list = [None] * self.np
def make(self, node_list):
"""Make a new partition table from scratch."""
# First, filter the list of nodes.
node_list = [n for n in node_list \
if n.getState() == RUNNING_STATE and n.getUUID() is not None]
if len(node_list) == 0:
# Impossible.
raise RuntimeError, \
'cannot make a partition table with an empty storage node list'
# Take it into account that the number of storage nodes may be less than the
# number of replicas.
repeats = min(self.nr, len(node_list))
index = 0
for offset in xrange(self.np):
row = []
for i in xrange(repeats):
row.append(Cell(node_list[index]))
index += 1
if index == len(uuid_list):
index = 0
self.partition_list[offset] = row
self.num_filled_rows = self.np
def setCell(self, offset, node, state):
row = self.partition_list[offset]
if row is None:
# Create a new row.
row = [Cell(node, state)]
self.partition_list[offset]
self.num_filled_rows += 1
else:
# XXX this can be slow, but it is necessary to remove a duplicate,
# if any.
for cell in row:
if cell.getNode() == node:
row.remove(cell)
break
row.append(Cell(node, state))
def filled(self):
return self.num_filled_rows == self.np
def hasOffset(self, offset):
return self.partition_list[offset] is not None
def operational(self):
if not self.filled():
return False
# FIXME it is better to optimize this code, as this could be extremely
# slow. The possible fix is to have a handler to notify a change on
# a node state, and record which rows are ready.
for row in self.partition_list:
for cell in row:
if cell.getState() == UP_TO_DATE_STATE \
and cell.getNodeState() == RUNNING_STATE:
break
else:
return False
return True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment