Commit eab036aa authored by Yoshinori Okuji's avatar Yoshinori Okuji

commit an imcomplete version only for future reference

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype2@20 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 48a82cba
......@@ -21,8 +21,10 @@ from node import NodeManager, MasterNode, StorageNode, ClientNode, \
from util import dump
class ElectionFailure(Exception): pass
class PrimaryFailure(Exception): pass
class NeoException(Exception): pass
class ElectionFailure(NeoException): pass
class PrimaryFailure(NeoException): pass
class RecoveryFailure(NeoException): pass
class Connection(BaseConnection):
"""This class provides a master-specific connection."""
......@@ -98,6 +100,8 @@ class Application(object):
self.primary = None
self.primary_master_node = None
self.ready = False
# Co-operative threads. Simulated by generators.
self.thread_dict = {}
self.server_thread_method = None
......@@ -207,10 +211,22 @@ class Application(object):
while 1:
try:
if self.primary:
self.playPrimaryRole()
while 1:
try:
self.startRecovery()
except RecoveryFailure:
logging.critical('unable to recover the system; use full recovery')
raise
self.playPrimaryRole()
else:
self.playSecondaryRole()
raise RuntimeError, 'should not reach here'
except (ElectionFailure, PrimaryFailure):
# Forget all connections.
for conn in cm.getConnectionList():
conn.close()
self.thread_dict.clear()
# Reelect a new primary master.
self.electPrimary(bootstrap = False)
CONNECTION_FAILED = 'connection failed'
......@@ -577,6 +593,7 @@ class Application(object):
self.primary = False
self.primary_master_node = node
logging.info('%s:%d is the primary' % node.getServer())
elif t == REELECT_PRIMARY_MASTER:
raise ElectionFailure, 'reelection requested'
else:
......@@ -657,6 +674,7 @@ class Application(object):
if self.primary is None:
# I am the primary.
self.primary = True
logging.info('I am the primary, so sending an announcement')
for conn in cm.getConnectionList():
if conn.from_self:
p = Packet().announcePrimaryMaster(conn.getNextId())
......@@ -670,6 +688,7 @@ class Application(object):
for conn in cm.getConnectionList():
if conn.from_self:
closed = False
break
if t + 10 < time():
for conn in cm.getConnectionList():
if conn.from_self:
......@@ -701,7 +720,9 @@ class Application(object):
raise ElectionFailure, 'no connection remains to the primary'
return
except ElectionFailure:
except ElectionFailure, m:
logging.info('election failed; %s' % m)
# Ask all connected nodes to reelect a single primary master.
for conn in cm.getConnectionList():
if conn.from_self:
......@@ -735,8 +756,203 @@ class Application(object):
self.thread_dict.clear()
bootstrap = False
def broadcastNodeStateChange(self, node):
state = node.getState()
uuid = node.getUUID()
ip_address, port = node.getServer()
if ip_address is None:
ip_address = '0.0.0.0'
if port is None:
port = 0
if isinstance(node, ClientNode):
# Notify secondary master nodes and storage nodes of
# the removal of the client node.
for c in cm.getConnectionList():
if c.getUUID() is not None:
n = nm.getNodeByUUID(uuid)
if isinstance(n, (MasterNode, StorageNode)):
p = Packet()
p.notifyNodeStateChange(c.getNextId(),
CLIENT_NODE_TYPE,
ip_address, port,
uuid, state)
c.addPacket(p)
elif isinstance(node, MasterNode):
for c in cm.getConnectionList():
if c.getUUID() is not None:
p = Packet()
p.notifyNodeStateChange(c.getNextId(),
MASTER_NODE_TYPE,
ip_address, port,
uuid, state)
c.addPacket(p)
elif isinstance(node, StorageNode):
for c in cm.getConnectionList():
if c.getUUID() is not None:
p = Packet()
p.notifyNodeStateChange(c.getNextId(),
STORAGE_NODE_TYPE,
ip_address, port,
uuid, state)
c.addPacket(p)
else:
raise Runtime, 'unknown node type'
def playPrimaryRoleServerIterator(self):
"""Handle events for a server connection."""
cm = self.cm
nm = self.nm
while 1:
method, conn = self.event[0], self.event[1]
logging.debug('method is %r, conn is %s:%d',
method, conn.ip_address, conn.port)
if method is self.CONNECTION_ACCEPTED:
pass
elif method in (self.CONNECTION_CLOSED, self.TIMEOUT_EXPIRED):
uuid = conn.getUUID()
if uuid is not None:
# If the peer is identified, mark it as temporarily down or down.
node = nm.getNodeByUUID(uuid)
if isinstance(node, ClientNode):
node.setState(DOWN_STATE)
self.broadcastNodeStateChange(node)
# For now, down client nodes simply get forgotten.
nm.remove(node)
elif isinstance(node, MasterNode):
if node.getState() not in (BROKEN_STATE, DOWN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
self.broadcastNodeStateChange(node)
elif isinstance(node, StorageNode):
if node.getState() not in (BROKEN_STATE, DOWN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
self.broadcastNodeStateChange(node)
# FIXME check the partition table.
self.pt.setTemporarilyDown(node.getUUID())
if self.ready and self.pt.fatal():
logging.critical('the storage nodes are not enough')
self.ready = False
self.broadcast
# FIXME update the database.
else:
raise RuntimeError, 'unknown node type'
return
elif method is self.PEER_BROKEN:
uuid = conn.getUUID()
if uuid is not None:
# If the peer is identified, mark it as broken.
node = nm.getNodeByUUID(uuid)
node.setState(BROKEN_STATE)
return
elif method is self.PACKET_RECEIVED:
if node is not None and node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
packet = self.event[2]
t = packet.getType()
try:
if t == ERROR:
code, msg = packet.decode()
if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE,
BROKEN_NODE_DISALLOWED_CODE):
# In those cases, it is better to assume that I am unusable.
logging.critical(msg)
raise RuntimeError, msg
else:
# Otherwise, the peer has an error.
logging.error('an error happened at the peer %s:%d',
conn.ip_address, conn.port)
if node is not None:
node.setState(BROKEN_STATE)
conn.close()
return
elif t == PING:
logging.info('got a keep-alive message from %s:%d; overloaded?',
conn.ip_address, conn.port)
conn.addPacket(Packet().pong(packet.getId()))
elif t == PONG:
pass
elif t == REQUEST_NODE_IDENTIFICATION:
node_type, uuid, ip_address, port, name = packet.decode()
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(),
'retry later'))
conn.abort()
continue
if name != self.name:
logging.info('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
continue
node = self.nm.getNodeByServer(ip_address, port)
if node is None:
node = MasterNode(ip_address, port, uuid)
self.nm.add(node)
self.unconnected_master_node_set.add((ip_address, port))
else:
# Trust the UUID sent by the peer.
if node.getUUID() != uuid:
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
self.uuid, self.ip_address, self.port)
conn.addPacket(p)
conn.expectMessage()
elif t == ASK_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
ltid, loid = packet.decode()
p = Packet()
if self.primary:
uuid = self.uuid
elif self.primary_master_node is not None:
uuid = self.primary_master_node.getUUID()
else:
uuid = INVALID_UUID
known_master_list = []
for n in self.nm.getMasterNodeList():
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p.answerPrimaryMaster(packet.getId(), self.ltid, self.loid,
uuid, known_master_list)
conn.addPacket(p)
if self.primary and (self.ltid < ltid or self.loid < loid):
# I am not really primary... So restart the election.
raise ElectionFailure, 'not a primary master any longer'
elif t == ANNOUNCE_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
if self.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
self.primary = False
self.primary_master_node = node
logging.info('%s:%d is the primary' % node.getServer())
elif t == REELECT_PRIMARY_MASTER:
raise ElectionFailure, 'reelection requested'
else:
raise ProtocolError(packet, 'unexpected packet 0x%x' % t)
except ProtocolError, m:
logging.debug('protocol problem %s', m[1])
conn.addPacket(Packet().protocolError(m[0].getId(), m[1]))
conn.abort()
else:
raise RuntimeError, 'unexpected event %r' % (method,)
def playPrimaryRole(self):
logging.info('play the primary role')
self.ready = False
raise NotImplementedError
def playSecondaryRole(self):
......
[DEFAULT]
master_nodes: 127.0.0.1:10010 127.0.0.1:10011
master_nodes: 127.0.0.1:10010 127.0.0.1:10011 127.0.0.1:10012
#replicas: 1
#partitions: 1009
#name: main
......@@ -14,3 +14,8 @@ server: 127.0.0.1:10010
database: master2
user: neo
server: 127.0.0.1:10011
[master3]
database: master3
user: neo
server: 127.0.0.1:10012
from time import time
RUNNING_STATE = 0
TEMPORARILY_DOWN_STATE = 2
DOWN_STATE = 3
BROKEN_STATE = 4
from protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE
class Node(object):
"""This class represents a node."""
......
......@@ -12,7 +12,7 @@ MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x100000
# Message types.
ERROR = 0x0000
ERROR = 0x8000
REQUEST_NODE_IDENTIFICATION = 0x0001
ACCEPT_NODE_IDENTIFICATION = 0x8001
PING = 0x0002
......@@ -21,6 +21,14 @@ ASK_PRIMARY_MASTER = 0x0003
ANSWER_PRIMARY_MASTER = 0x8003
ANNOUNCE_PRIMARY_MASTER = 0x0004
REELECT_PRIMARY_MASTER = 0x0005
NOTIFY_NODE_STATE_CHANGE = 0x0006
SEND_NODE_INFORMATION = 0x0007
START_OPERATION = 0x0008
STOP_OPERATION = 0x0009
ASK_FINISHING_TRANSACTIONS = 0x000a
ANSWER_FINISHING_TRANSACTIONS = 0x800a
FINISH_TRANSACTIONS = 0x000b
# Error codes.
NOT_READY_CODE = 1
......@@ -39,6 +47,14 @@ CLIENT_NODE_TYPE = 3
VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE)
# Node states.
RUNNING_STATE = 0
TEMPORARILY_DOWN_STATE = 2
DOWN_STATE = 3
BROKEN_STATE = 4
VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE)
# Other constants.
INVALID_UUID = '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
INVALID_TID = '\0\0\0\0\0\0\0\0'
......@@ -163,6 +179,28 @@ class Packet:
self._body = ''
return self
def notifyNodeStateChange(self, msg_id, node_type, ip_address, port, uuid, state):
self._id = msg_id
self._type = NOTIFY_NODE_STATE_CHANGE
self._body = pack('!H4sH16sH', node_type, inet_aton(ip_address), port, uuid, state)
return self
def askNodeInformation(self, msg_id):
self._id = msg_id
self._type = ASK_NODE_INFORMATION
self._body = ''
return self
def answerNodeInformation(self, msg_id, node_list):
self._id = msg_id
self._type = ANSWER_NODE_INFORMATION
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state)
self._body = ''.join(body)
return self
# Decoders.
def decode(self):
try:
......@@ -250,3 +288,40 @@ class Packet:
def _decodeReelectPrimaryMaster(self):
pass
decode_table[REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
def _decodeNotifyNodeStateChange(self):
try:
node_type, ip_address, port, uuid, state = unpack('!H4sH16sH', self._body[:26])
ip_address = inet_ntoa(ip_address)
except:
raise ProtocolError(self, 'invalid notify node state change')
if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type)
if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state)
return node_type, ip_address, port, uuid, state
decode_table[NOTIFY_NODE_STATE_CHANGE] = _decodeNotifyNodeStateChange
def _decodeAskNodeInformation(self):
pass
decode_table[ASK_NODE_INFORMATION] = _decodeAskNodeInformation
def _decodeAnswerNodeInformation(self):
try:
n = unpack('!L', self._body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', self._body[4+i*26:30+i*26])
node_type, ip_address, port, uuid, state = r
ip_address = inet_ntoa(ip_address)
if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type)
if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state)
node_list.append((node_type, ip_address, port, uuid, state))
except ProtocolError:
raise
except:
raise ProtocolError(self, 'invalid answer node information')
return node_list
decode_table[ANSWER_NODE_INFORMATION] = _decodeAnswerNodeInformation
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment