Commit 03bcb91c authored by Yoshinori Okuji's avatar Yoshinori Okuji

The election part is rewritten. Somehow tested.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@25 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent dc413da3
...@@ -38,7 +38,7 @@ class BaseConnection(object): ...@@ -38,7 +38,7 @@ class BaseConnection(object):
def getHandler(self): def getHandler(self):
return self.handler return self.handler
def setHandler(self): def setHandler(self, handler):
self.handler = handler self.handler = handler
def getEventManager(self): def getEventManager(self):
...@@ -73,7 +73,7 @@ class ListeningConnection(BaseConnection): ...@@ -73,7 +73,7 @@ class ListeningConnection(BaseConnection):
class Connection(BaseConnection): class Connection(BaseConnection):
"""A connection.""" """A connection."""
def __init__(self, event_manager, handler, s = None, addr = None): def __init__(self, event_manager, handler, s = None, addr = None):
BaseConnection.__init__(self, handler, event_manager, s = s, addr = addr) BaseConnection.__init__(self, event_manager, handler, s = s, addr = addr)
if s is not None: if s is not None:
event_manager.addReader(self) event_manager.addReader(self)
self.read_buf = [] self.read_buf = []
...@@ -231,7 +231,7 @@ class Connection(BaseConnection): ...@@ -231,7 +231,7 @@ class Connection(BaseConnection):
# If this is the first time, enable polling for writing. # If this is the first time, enable polling for writing.
if len(self.write_buf) == 1: if len(self.write_buf) == 1:
self.em.addWriter(self.s) self.em.addWriter(self)
def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30): def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30):
"""Expect a message for a reply to a given message ID or any message. """Expect a message for a reply to a given message ID or any message.
...@@ -288,13 +288,13 @@ class ClientConnection(Connection): ...@@ -288,13 +288,13 @@ class ClientConnection(Connection):
if self.connecting: if self.connecting:
err = self.s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) err = self.s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err: if err:
self.connectionFailed() self.handler.connectionFailed(self)
self.close() self.close()
return return
else: else:
self.connecting = False self.connecting = False
self.handler.connectionCompleted(self) self.handler.connectionCompleted(self)
self.cm.addReader(self.s) self.em.addReader(self)
else: else:
Connection.writable(self) Connection.writable(self)
......
class NeoException(Exception): pass
class ElectionFailure(NeoException): pass
class PrimaryFailure(NeoException): pass
...@@ -20,18 +20,19 @@ class EventHandler(object): ...@@ -20,18 +20,19 @@ class EventHandler(object):
def connectionStarted(self, conn): def connectionStarted(self, conn):
"""Called when a connection is started.""" """Called when a connection is started."""
pass logging.debug('connection started for %s:%d', *(conn.getAddress()))
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
"""Called when a connection is completed.""" """Called when a connection is completed."""
pass logging.debug('connection completed for %s:%d', *(conn.getAddress()))
def connectionFailed(self, conn): def connectionFailed(self, conn):
"""Called when a connection failed.""" """Called when a connection failed."""
pass logging.debug('connection failed for %s:%d', *(conn.getAddress()))
def connectionAccepted(self, conn, s, addr): def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted.""" """Called when a connection is accepted."""
logging.debug('connection accepted from %s:%d', *addr)
new_conn = ServerConnection(conn.getEventManager(), conn.getHandler(), new_conn = ServerConnection(conn.getEventManager(), conn.getHandler(),
s = s, addr = addr) s = s, addr = addr)
# A request for a node identification should arrive. # A request for a node identification should arrive.
...@@ -39,19 +40,21 @@ class EventHandler(object): ...@@ -39,19 +40,21 @@ class EventHandler(object):
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
"""Called when a timeout event occurs.""" """Called when a timeout event occurs."""
pass logging.debug('timeout expired for %s:%d', *(conn.getAddress()))
def connectionClosed(self, conn): def connectionClosed(self, conn):
"""Called when a connection is closed by the peer.""" """Called when a connection is closed by the peer."""
pass logging.debug('connection closed for %s:%d', *(conn.getAddress()))
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
"""Called when a packet is received.""" """Called when a packet is received."""
logging.debug('packet received from %s:%d', *(conn.getAddress()))
self.dispatch(conn, packet) self.dispatch(conn, packet)
def packetMalformed(self, conn, packet, error_message): def packetMalformed(self, conn, packet, error_message):
"""Called when a packet is malformed.""" """Called when a packet is malformed."""
logging.info('malformed packet: %s', error_message) logging.info('malformed packet from %s:%d: %s',
conn.getAddress()[0], conn.getAddress()[1], error_message)
conn.addPacket(Packet().protocolError(packet.getId(), error_message)) conn.addPacket(Packet().protocolError(packet.getId(), error_message))
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
...@@ -65,7 +68,7 @@ class EventHandler(object): ...@@ -65,7 +68,7 @@ class EventHandler(object):
t = packet.getType() t = packet.getType()
try: try:
method = self.packet_dispatch_table[t] method = self.packet_dispatch_table[t]
args = packet.decode() args = packet.decode() or ()
method(conn, packet, *args) method(conn, packet, *args)
except ValueError: except ValueError:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -107,10 +110,10 @@ class EventHandler(object): ...@@ -107,10 +110,10 @@ class EventHandler(object):
def handlePong(self, conn, packet): def handlePong(self, conn, packet):
pass pass
def handleAskPrimaryNode(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryNode(self, conn, packet, primary_uuid, known_master_list): def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -9,14 +9,11 @@ from neo.protocol import Packet, ProtocolError, \ ...@@ -9,14 +9,11 @@ from neo.protocol import Packet, ProtocolError, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID INVALID_UUID, INVALID_OID, INVALID_TID
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.handler import EventHandler
from neo.event import EventManager from neo.event import EventManager
from neo.util import dump from neo.util import dump
from neo.connection import ListeningConnection from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure
class NeoException(Exception): pass from neo.master.election import ElectionEventHandler
class ElectionFailure(NeoException): pass
class PrimaryFailure(NeoException): pass
class Application(object): class Application(object):
"""The master node application.""" """The master node application."""
...@@ -86,314 +83,11 @@ class Application(object): ...@@ -86,314 +83,11 @@ class Application(object):
except (ElectionFailure, PrimaryFailure): except (ElectionFailure, PrimaryFailure):
# Forget all connections. # Forget all connections.
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
if not isinstance(conn, ListeningConnection):
conn.close() conn.close()
# Reelect a new primary master. # Reelect a new primary master.
self.electPrimary(bootstrap = False) self.electPrimary(bootstrap = False)
def electPrimaryClientIterator(self):
"""Handle events for a client connection."""
# The first event. This must be a connection failure or connection completion.
# Keep the Connection object and the server address only at this time,
# because they never change in this context.
method, conn = self.event[0], self.event[1]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
serv = (conn.ip_address, conn.port)
node = self.nm.getNodeByServer(*serv)
if node is None:
raise RuntimeError, 'attempted to connect to an unknown node'
if not isinstance(node, MasterNode):
raise RuntimeError, 'should not happen'
if method is self.CONNECTION_FAILED:
self.negotiating_master_node_set.discard(serv)
self.unconnected_master_node_set.add(serv)
if node.getState() not in (DOWN_STATE, BROKEN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
return
elif method is self.CONNECTION_COMPLETED:
self.negotiating_master_node_set.add(serv)
# Request a node idenfitication.
p = Packet()
msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, self.uuid,
self.ip_address, self.port, self.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
else:
raise RuntimeError, 'unexpected event %r' % (method,)
while 1:
# Wait for next event.
yield None
method = self.event[0]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
if method in (self.CONNECTION_CLOSED, self.TIMEOUT_EXPIRED):
self.negotiating_master_node_set.discard(serv)
self.unconnected_master_node_set.add(serv)
if node.getState() not in (DOWN_STATE, BROKEN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
return
elif method is self.PEER_BROKEN:
self.negotiating_master_node_set.discard(serv)
# For now, do not use BROKEN_STATE, because the effect is unknown
# when a node was buggy and restarted immediately.
node.setState(DOWN_STATE)
return
elif method is self.PACKET_RECEIVED:
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
packet = self.event[2]
t = packet.getType()
try:
if t == ERROR:
code, msg = packet.decode()
if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE,
BROKEN_NODE_DISALLOWED_CODE):
# In those cases, it is better to assume that I am unusable.
logging.critical(msg)
raise RuntimeError, msg
else:
# Otherwise, the peer has an error.
logging.error('an error happened at the peer %s:%d',
conn.ip_address, conn.port)
node.setState(DOWN_STATE)
self.negotiating_master_node_set.discard(serv)
conn.close()
return
elif t == PING:
logging.info('got a keep-alive message from %s:%d; overloaded?',
conn.ip_address, conn.port)
conn.addPacket(Packet().pong(packet.getId()))
elif t == PONG:
pass
elif t == ACCEPT_NODE_IDENTIFICATION:
node_type, uuid, ip_address, port = packet.decode()
if node_type != MASTER_NODE_TYPE:
# Why? Isn't this a master node?
self.nm.remove(node)
self.negotiating_master_node_set.discard(serv)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
# Ask a primary master.
msg_id = conn.getNextId()
p = Packet()
conn.addPacket(p.askPrimaryMaster(msg_id, self.ltid, self.loid))
conn.expectMessage(msg_id)
elif t == ANSWER_PRIMARY_MASTER:
ltid, loid, primary_uuid, known_master_list = packet.decode()
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
if self.ip_address == ip_address and self.port == port:
# This is self.
continue
else:
n = self.nm.getNodeByServer(ip_address, port)
if n is None:
n = MasterNode(ip_address, port)
self.nm.add(n)
self.unconnected_master_node_set.add((ip_address, port))
if uuid != INVALID_UUID:
n.setUUID(uuid)
elif uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if self.primary_master_node is not None \
and self.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = self.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if node.getUUID() == primary_uuid:
if self.ltid <= ltid and self.loid <= loid:
# This one is good.
self.primary = False
self.primary_master_node = node
else:
# Not nice. I am newer. If the primary master is
# already serving, the situation is catastrophic.
# In this case, it will shutdown the cluster.
# Otherwise, I can be a new primary master, so
# continue this job.
pass
else:
# I will continue this, until I find the primary
# master.
pass
else:
if self.ltid < ltid or self.loid < loid \
or inet_aton(self.ip_address) > inet_aton(ip_address) \
or self.port > port:
# I lost.
self.primary = False
else:
# I won.
pass
self.negotiating_master_node_set.discard(serv)
else:
raise ProtocolError(packet, 'unexpected packet 0x%x' % t)
except ProtocolError, m:
logging.debug('protocol problem %s', m[1])
conn.addPacket(Packet().protocolError(m[0].getId(), m[1]))
conn.abort()
self.negotiating_master_node_set.discard(serv)
self.unconnected_master_node_set.add(serv)
else:
raise RuntimeError, 'unexpected event %r' % (method,)
def electPrimaryServerIterator(self):
"""Handle events for a server connection."""
# The first event. This must be a connection acception.
method, conn = self.event[0], self.event[1]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
serv = (conn.ip_address, conn.port)
node = None
if method is self.CONNECTION_ACCEPTED:
# Nothing to do at the moment. The timeout handling is done in
# the connection itself.
pass
else:
raise RuntimeError, 'unexpected event %r' % (method,)
while 1:
# Wait for next event.
yield None
method = self.event[0]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
if method in (self.CONNECTION_CLOSED, self.TIMEOUT_EXPIRED):
return
elif method is self.PEER_BROKEN:
if node is not None:
if isinstance(node, MasterNode):
node.setState(DOWN_STATE)
elif isinstance(node, (ClientNode, StorageNode)):
node.setState(BROKEN_STATE)
return
elif method is self.PACKET_RECEIVED:
if node is not None and node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
packet = self.event[2]
t = packet.getType()
try:
if t == ERROR:
code, msg = packet.decode()
if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE,
BROKEN_NODE_DISALLOWED_CODE):
# In those cases, it is better to assume that I am unusable.
logging.critical(msg)
raise RuntimeError, msg
else:
# Otherwise, the peer has an error.
logging.error('an error happened at the peer %s:%d',
conn.ip_address, conn.port)
if node is not None:
node.setState(BROKEN_STATE)
conn.close()
return
elif t == PING:
logging.info('got a keep-alive message from %s:%d; overloaded?',
conn.ip_address, conn.port)
conn.addPacket(Packet().pong(packet.getId()))
elif t == PONG:
pass
elif t == REQUEST_NODE_IDENTIFICATION:
node_type, uuid, ip_address, port, name = packet.decode()
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(),
'retry later'))
conn.abort()
continue
if name != self.name:
logging.info('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
continue
node = self.nm.getNodeByServer(ip_address, port)
if node is None:
node = MasterNode(ip_address, port, uuid)
self.nm.add(node)
self.unconnected_master_node_set.add((ip_address, port))
else:
# Trust the UUID sent by the peer.
if node.getUUID() != uuid:
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
self.uuid, self.ip_address, self.port)
conn.addPacket(p)
conn.expectMessage()
elif t == ASK_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
ltid, loid = packet.decode()
p = Packet()
if self.primary:
uuid = self.uuid
elif self.primary_master_node is not None:
uuid = self.primary_master_node.getUUID()
else:
uuid = INVALID_UUID
known_master_list = []
for n in self.nm.getMasterNodeList():
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p.answerPrimaryMaster(packet.getId(), self.ltid, self.loid,
uuid, known_master_list)
conn.addPacket(p)
if self.primary and (self.ltid < ltid or self.loid < loid):
# I am not really primary... So restart the election.
raise ElectionFailure, 'not a primary master any longer'
elif t == ANNOUNCE_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
if self.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
self.primary = False
self.primary_master_node = node
logging.info('%s:%d is the primary' % node.getServer())
elif t == REELECT_PRIMARY_MASTER:
raise ElectionFailure, 'reelection requested'
else:
raise ProtocolError(packet, 'unexpected packet 0x%x' % t)
except ProtocolError, m:
logging.debug('protocol problem %s', m[1])
conn.addPacket(Packet().protocolError(m[0].getId(), m[1]))
conn.abort()
else:
raise RuntimeError, 'unexpected event %r' % (method,)
def electPrimary(self, bootstrap = True): def electPrimary(self, bootstrap = True):
"""Elect a primary master node. """Elect a primary master node.
...@@ -403,13 +97,16 @@ class Application(object): ...@@ -403,13 +97,16 @@ class Application(object):
to self as well as master nodes.""" to self as well as master nodes."""
logging.info('begin the election of a primary master') logging.info('begin the election of a primary master')
self.server_thread_method = self.electPrimaryServerIterator
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
handler = ElectionEventHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
# Make sure that every connection has the election event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
while 1: while 1:
t = 0 t = 0
self.primary = None self.primary = None
...@@ -445,12 +142,8 @@ class Application(object): ...@@ -445,12 +142,8 @@ class Application(object):
# Try to connect to master nodes. # Try to connect to master nodes.
if self.unconnected_master_node_set: if self.unconnected_master_node_set:
for node in list(self.unconnected_master_node_set): for addr in list(self.unconnected_master_node_set):
self.unconnected_master_node_set.remove(node) ClientConnection(em, handler, addr = addr)
self.negotiating_master_node_set.add(node)
client = self.electPrimaryClientIterator()
self.thread_dict[node] = client
cm.connect(*node)
if len(self.unconnected_master_node_set) == 0 \ if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0: and len(self.negotiating_master_node_set) == 0:
...@@ -464,24 +157,23 @@ class Application(object): ...@@ -464,24 +157,23 @@ class Application(object):
# I am the primary. # I am the primary.
self.primary = True self.primary = True
logging.info('I am the primary, so sending an announcement') logging.info('I am the primary, so sending an announcement')
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if conn.from_self: if isinstance(conn, ClientConnection):
p = Packet().announcePrimaryMaster(conn.getNextId()) p = Packet().announcePrimaryMaster(conn.getNextId())
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
closed = False closed = False
t = time() t = time()
while not closed: while not closed:
cm.poll(1) em.poll(1)
closed = True closed = True
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if conn.from_self: if isinstance(conn, ClientConnection):
closed = False closed = False
break break
if t + 10 < time(): if t + 10 < time():
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if conn.from_self: if isinstance(conn, ClientConnection):
del self.thread_dict[(conn.ip_address, conn.port)]
conn.close() conn.close()
closed = True closed = True
else: else:
...@@ -489,32 +181,34 @@ class Application(object): ...@@ -489,32 +181,34 @@ class Application(object):
# the primary master is down. # the primary master is down.
t = time() t = time()
while self.primary_master_node is None: while self.primary_master_node is None:
cm.poll(1) em.poll(1)
if t + 10 < time(): if t + 10 < time():
raise ElectionFailure, 'no primary master elected' raise ElectionFailure, 'no primary master elected'
# Now I need only a connection to the primary master node. # Now I need only a connection to the primary master node.
primary = self.primary_master_node primary = self.primary_master_node
for conn in cm.getConnectionList(): addr = primary.getServer()
if not conn.from_self \ for conn in em.getConnectionList():
or primary.getServer() != (conn.ip_address, conn.port): if isinstance(conn, ServerConnection) \
del self.thread_dict[(conn.ip_address, conn.port)] or isinstance(conn, ClientConnection) \
and addr != conn.getAddress():
conn.close() conn.close()
# But if there is no such connection, something wrong happened. # But if there is no such connection, something wrong happened.
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if conn.from_self \ if isinstance(conn, ClientConnection) \
and primary.getServer() == (conn.ip_address, conn.port): and addr == conn.getAddress():
break break
else: else:
raise ElectionFailure, 'no connection remains to the primary' raise ElectionFailure, 'no connection remains to the primary'
return return
except ElectionFailure, m: except ElectionFailure, m:
logging.info('election failed; %s' % m) logging.error('election failed; %s' % m)
# Ask all connected nodes to reelect a single primary master. # Ask all connected nodes to reelect a single primary master.
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if conn.from_self: if isinstance(conn, ClientConnection):
conn.addPacket(Packet().reelectPrimaryMaster(conn.getNextId())) conn.addPacket(Packet().reelectPrimaryMaster(conn.getNextId()))
conn.abort() conn.abort()
...@@ -525,13 +219,13 @@ class Application(object): ...@@ -525,13 +219,13 @@ class Application(object):
t = time() t = time()
while not closed: while not closed:
try: try:
cm.poll(1) em.poll(1)
except ElectionFailure: except ElectionFailure:
pass pass
closed = True closed = True
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if conn.from_self: if isinstance(conn, ClientConnection):
# Still not closed. # Still not closed.
closed = Falsed closed = Falsed
break break
...@@ -539,10 +233,11 @@ class Application(object): ...@@ -539,10 +233,11 @@ class Application(object):
if time() > t + 10: if time() > t + 10:
# If too long, do not wait. # If too long, do not wait.
break break
# Close all connections. # Close all connections.
for conn in cm.getConnectionList(): for conn in em.getConnectionList():
if not isinstance(conn, ListeningConnection):
conn.close() conn.close()
self.thread_dict.clear()
bootstrap = False bootstrap = False
def broadcastNodeStateChange(self, node): def broadcastNodeStateChange(self, node):
...@@ -558,7 +253,7 @@ class Application(object): ...@@ -558,7 +253,7 @@ class Application(object):
# Notify secondary master nodes and storage nodes of # Notify secondary master nodes and storage nodes of
# the removal of the client node. # the removal of the client node.
for c in cm.getConnectionList(): for c in em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
n = nm.getNodeByUUID(uuid) n = nm.getNodeByUUID(uuid)
if isinstance(n, (MasterNode, StorageNode)): if isinstance(n, (MasterNode, StorageNode)):
...@@ -569,7 +264,7 @@ class Application(object): ...@@ -569,7 +264,7 @@ class Application(object):
uuid, state) uuid, state)
c.addPacket(p) c.addPacket(p)
elif isinstance(node, MasterNode): elif isinstance(node, MasterNode):
for c in cm.getConnectionList(): for c in em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
p = Packet() p = Packet()
p.notifyNodeStateChange(c.getNextId(), p.notifyNodeStateChange(c.getNextId(),
...@@ -578,7 +273,7 @@ class Application(object): ...@@ -578,7 +273,7 @@ class Application(object):
uuid, state) uuid, state)
c.addPacket(p) c.addPacket(p)
elif isinstance(node, StorageNode): elif isinstance(node, StorageNode):
for c in cm.getConnectionList(): for c in em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
p = Packet() p = Packet()
p.notifyNodeStateChange(c.getNextId(), p.notifyNodeStateChange(c.getNextId(),
...@@ -591,7 +286,7 @@ class Application(object): ...@@ -591,7 +286,7 @@ class Application(object):
def playPrimaryRoleServerIterator(self): def playPrimaryRoleServerIterator(self):
"""Handle events for a server connection.""" """Handle events for a server connection."""
cm = self.cm em = self.em
nm = self.nm nm = self.nm
while 1: while 1:
method, conn = self.event[0], self.event[1] method, conn = self.event[0], self.event[1]
......
import logging
from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE
from neo.master.handler import MasterEventHandler
from neo.connection import ClientConnection
from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID
from neo.util import dump
class ElectionEventHandler(MasterEventHandler):
"""This class deals with events for a primary master election."""
def connectionStarted(self, conn):
app = self.app
addr = conn.getAddress()
app.unconnected_master_node_set.remove(addr)
app.negotiating_master_node_set.add(addr)
MasterEventHandler.connectionStarted(self, conn)
def connectionCompleted(self, conn):
app = self.app
addr = conn.getAddress()
# Request a node idenfitication.
p = Packet()
msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
MasterEventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn):
app = self.app
addr = conn.getAddress()
app.negotiating_master_node_set.discard(addr)
node = app.nm.getNodeByServer(addr)
if node.getState() not in (DOWN_STATE, BROKEN_STATE):
app.unconnected_master_node_set.add(addr)
node.setState(TEMPORARILY_DOWN_STATE)
MasterEventHandler.connectionFailed(self, conn)
def connectionClosed(self, conn):
if isinstance(conn, ClientConnection):
self.connectionFailed(conn)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
if isinstance(conn, ClientConnection):
self.connectionFailed(conn)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
app = self.app
addr = conn.getAddress()
node = app.nm.getNodeByServer(addr)
if isinstance(conn, ClientConnection):
node.setState(DOWN_STATE)
app.negotiating_master_node_set.discard(addr)
else:
if node.getUUID() is not None:
node.setState(BROKEN_STATE)
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
if isinstance(conn, ClientConnection):
node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet)
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
if isinstance(conn, ClientConnection):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
app.nm.remove(node)
app.negotiating_master_node_set.discard(node.getAddress())
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
app.negotiating_master_node_set.discard(node.getServer())
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
# Ask a primary master.
msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
if isinstance(conn, ClientConnection):
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary = False
app.primary_master_node = node
else:
if app.uuid < uuid:
# I lost.
app.primary = False
app.negotiating_master_node_set.discard(conn.getAddress())
else:
self.handleUnexpectedPacket(conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
app = self.app
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later'))
conn.abort()
return
if name != app.name:
logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
ap.unconnected_master_node_set.add(addr)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
def handleAskPrimaryMaster(self, conn, packet):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
if app.primary:
primary_uuid = app.uuid
elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else:
primary_uuid = INVALID_UUID
known_master_list = []
for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE:
continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = Packet()
p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
if app.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
node = app.nm.getNodeByUUID(uuid)
app.primary = False
app.primary_master_node = node
logging.info('%s:%d is the primary', *(node.getServer()))
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
from neo.handler import EventHandler
class MasterEventHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def __init__(self, app):
self.app = app
EventHandler.__init__(self)
from time import time from time import time
import logging
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.util import dump
class Node(object): class Node(object):
"""This class represents a node.""" """This class represents a node."""
...@@ -28,6 +30,7 @@ class Node(object): ...@@ -28,6 +30,7 @@ class Node(object):
self.last_state_change = time() self.last_state_change = time()
def setServer(self, server): def setServer(self, server):
if self.server != server:
if self.server is not None: if self.server is not None:
self.manager.unregisterServer(self) self.manager.unregisterServer(self)
...@@ -38,6 +41,7 @@ class Node(object): ...@@ -38,6 +41,7 @@ class Node(object):
return self.server return self.server
def setUUID(self, uuid): def setUUID(self, uuid):
if self.uuid != uuid:
if self.uuid is not None: if self.uuid is not None:
self.manager.unregisterUUID(self) self.manager.unregisterUUID(self)
...@@ -96,11 +100,11 @@ class NodeManager(object): ...@@ -96,11 +100,11 @@ class NodeManager(object):
pass pass
def registerUUID(self, node): def registerUUID(self, node):
self.server_dict[node.getUUID()] = node self.uuid_dict[node.getUUID()] = node
def unregisterUUID(self, node): def unregisterUUID(self, node):
try: try:
del self.server_dict[node.getUUID()] del self.uuid_dict[node.getUUID()]
except KeyError: except KeyError:
pass pass
......
...@@ -125,6 +125,10 @@ class Packet(object): ...@@ -125,6 +125,10 @@ class Packet(object):
def notReady(self, msg_id, error_message): def notReady(self, msg_id, error_message):
return self.error(msg_id, NOT_READY, 'not ready: ' + error_message) return self.error(msg_id, NOT_READY, 'not ready: ' + error_message)
def brokenNodeDisallowedError(self, msg_id, error_message):
return self.error(msg_id, BROKEN_NODE_DISALLOWED_ERROR,
'broken node disallowed error: ' + error_message)
def ping(self, msg_id): def ping(self, msg_id):
self._id = msg_id self._id = msg_id
self._type = PING self._type = PING
...@@ -252,10 +256,10 @@ class Packet(object): ...@@ -252,10 +256,10 @@ class Packet(object):
def _decodeAnswerPrimaryMaster(self): def _decodeAnswerPrimaryMaster(self):
try: try:
primary_uuid, n = unpack('!16sL', self._body[:36]) primary_uuid, n = unpack('!16sL', self._body[:20])
known_master_list = [] known_master_list = []
for i in xrange(n): for i in xrange(n):
ip_address, port, uuid = unpack('!4sH16s', self._body[36+i*22:58+i*22]) ip_address, port, uuid = unpack('!4sH16s', self._body[20+i*22:42+i*22])
ip_address = inet_ntoa(ip_address) ip_address = inet_ntoa(ip_address)
known_master_list.append((ip_address, port, uuid)) known_master_list.append((ip_address, port, uuid))
except: except:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment