Commit f96c2801 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implemented the service handler. Added more utility methods. Made the master handler a bit safer.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@36 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fb998338
......@@ -2,7 +2,8 @@ import logging
import MySQLdb
import os
from socket import inet_aton
from time import time
from time import time, gmtime
from struct import pack, unpack
from neo.config import ConfigurationManager
from neo.protocol import Packet, ProtocolError, \
......@@ -17,6 +18,7 @@ from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure,
from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler
from neo.master.service import ServiceEventHandler
from neo.pt import PartitionTable
class Application(object):
......@@ -414,7 +416,7 @@ class Application(object):
uuid_set = set()
# Determine to which nodes I should ask.
partition = tid % self.num_partitions
partition = self.getPartition(tid)
transaction_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, True)]
if len(transaction_uuid_list) == 0:
......@@ -450,7 +452,7 @@ class Application(object):
# Verify that all objects are present.
for oid in self.unfinished_oid_set:
self.asking_uuid_dict.clear()
partition = oid % self.num_partitions
partition = self.getPartition(oid)
object_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, True)]
if len(object_uuid_list) == 0:
......@@ -607,6 +609,9 @@ class Application(object):
em = self.em
nm = self.nm
# This dictionary is used to hold information on transactions being finished.
self.finishing_transaction_dict = {}
# Make sure that every connection has the service event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
......@@ -684,3 +689,32 @@ class Application(object):
self.lptid = ''.join(l)
return self.lptid
def getNextTID(self):
tm = time()
gmt = gmtime(tm)
upper = (((gmt.tm_year * 12 + gmt.tm_mon) * 31 + gmt.tm_mday - 1) \
* 24 + gmt.tm_hour) * 60 + gmt.tm_min
lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
tid = pack('!LL', upper, lower)
if tid <= self.ltid:
upper, lower = unpack('!LL', self.ltid)
if lower == 0xffffffff:
# This should not happen usually.
from datetime import timedelta, datetime
hour, min = divmod(upper, 60)
day, hour = divmod(hour, 24)
month, day = divmod(day, 31)
year, month = divmod(month, 12)
d = datetime(year, month, day + 1, hour, min) + timedelta(0, 60)
upper = (((d.year * 12 + d.month) * 31 + d.day - 1) \
* 24 + d.hour) * 60 + d.minute
lower = 0
else:
lower += 1
tid = pack('!LL', upper, lower)
self.ltid = tid
return tid
def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.num_partitions
import logging
from neo.handler import EventHandler
class MasterEventHandler(EventHandler):
......@@ -5,3 +7,60 @@ class MasterEventHandler(EventHandler):
def __init__(self, app):
self.app = app
EventHandler.__init__(self)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
raise NotImplementedError('this method must be overridden')
def handleAskPrimaryMaster(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleAnnouncePrimaryMaster(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleReelectPrimaryMaster(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleNotifyNodeInformation(self, conn, packet, node_list):
logging.info('ignoring Notify Node Information')
pass
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
logging.info('ignoring Answer Last IDs')
pass
def handleAnswerPartitionTable(self, conn, packet, cell_list):
logging.info('ignoring Answer Partition Table')
pass
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
logging.info('ignoring Answer Unfinished Transactions')
pass
def handleAnswerOIDsByTID(self, conn, packet, oid_list, tid):
logging.info('ignoring Answer OIDs By TID')
pass
def handleTidNotFound(self, conn, packet, message):
logging.info('ignoring Answer OIDs By TID')
pass
def handleAnswerObjectPresent(self, conn, packet, oid, tid):
logging.info('ignoring Answer Object Present')
pass
def handleOidNotFound(self, conn, packet, message):
logging.info('ignoring OID Not Found')
pass
def handleAskNewTID(self, conn, packet):
logging.info('ignoring Ask New TID')
pass
def handleFinishTransaction(self, conn, packet, oid_list, tid):
logging.info('ignoring Finish Transaction')
pass
def handleNotifyTransactionLocked(self, conn, packet, tid):
logging.info('ignoring Notify Transaction Locked')
pass
......@@ -327,22 +327,3 @@ class RecoveryEventHandler(MasterEventHandler):
app.nm.add(n)
app.pt.setCell(offset, n, state)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
# This can be from previous verification stage.
pass
def handleAnswerOIDsByTID(self, conn, packet, oid_list, tid):
# This can be from previous verification stage.
pass
def handleTidNotFound(self, conn, packet, message):
# This can be from previous verification stage.
pass
def handleAnswerObjectPresent(self, conn, packet, oid, tid):
# This can be from previous verification stage.
pass
def handleOidNotFound(self, conn, packet, message):
# This can be from previous verification stage.
pass
import logging
from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE
from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, INVALID_UUID
from neo.exception import OperationFailure
from neo.util import dump
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn, packet, oid_list, uuid_set):
self._conn = conn
self._msg_id = packet.getId()
self._oid_list = oid_list
self._uuid_set = uuid_set
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def getMessageId(self):
return self._msg_id
def getOIDList(self):
return self._oid_list
def getUUIDSet(self):
return self._uuid_set
def addLockedUUID(self, uuid):
if uuid in self._uuid_set:
self._locked_uuid_set.add(uuid)
def allLocked(self):
return self._uuid_set == self._locked_uuid_set
class ServiceEventHandler(MasterEventHandler):
"""This class deals with events for a service phase."""
def connectionClosed(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
elif isinstance(node, StorageNode):
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
elif isinstance(node, StorageNode):
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
elif isinstance(node, StorageNode):
cell_list = app.pt.dropNode(node)
ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
app = self.app
if name != app.name:
logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
return
# Here are many situations. In principle, a node should be identified by
# an UUID, since an UUID never change when moving a storage node to a different
# server, and an UUID always changes for a master node and a client node whenever
# it restarts, so more reliable than a server address.
#
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
addr = (ip_address, port)
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is None:
# If nothing is present, try with the server address.
node = app.nm.getNodeByServer(addr)
if node is None:
# Nothing is found. So this must be the first time that this node
# connected to me.
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE:
node = ClientNode(uuid = uuid)
else:
node = StorageNode(server = address, uuid = uuid)
app.nm.add(node)
app.broadcastNodeInformation(node)
else:
# Otherwise, I know it only by the server address or the same server
# address but with a different UUID.
if node.getUUID() is None:
# This must be a master node.
if not isinstance(node, MasterNode) or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master
# node.
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid server address'))
conn.abort()
return
node.setUUID(uuid)
if node.getState() != RUNNING_STATE:
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# This node has a different UUID.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid server address'))
conn.abort()
return
else:
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# I know this node by the UUID.
if node.getServer() != addr:
# This node has a different server address.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid server address'))
conn.abort()
return
else:
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setServer(addr)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# If this node is broken, reject it. Otherwise, assume that it is
# working again.
if node.getState() == BROKEN_STATE:
p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p)
conn.abort()
return
else:
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
conn.setUUID(uuid)
if isinstance(node, StorageNode):
# If this is a storage node, add it into the partition table.
# Note that this does no harm, even if the node is not new.
cell_list = app.pt.addNode(node)
if len(cell_list) != 0:
ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
# Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because
# I must send all node information immediately.
p = Packet()
p.answerPrimaryMaster(packet.getId(), app.uuid, [])
conn.addPacket(p)
# Send the information.
node_list = []
for n in app.nm.getNodeList():
ip_address, port = n.getServer()
node_list.append((n.getNodeType(), ip_address, port,
n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big.
p = Packet()
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p)
del node_list[:]
p = Packet()
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p)
# If this is a storage node or a client node, send the partition table.
node = app.nm.getNodeByUUID(uuid)
if isinstance(node, (StorageNode, ClientNode)):
# Split the packet if too huge.
p = Packet()
row_list = []
for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000:
p.sendPartitionTable(app.lptid, row_list)
conn.addPacket(p)
del row_list[:]
if len(row_list) != 0:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p)
# If this is a storage node, ask it to start.
if isinstance(node, StorageNode):
conn.addPacket(Packet().startOperation(conn.getNextId()))
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type == CLIENT_NODE:
# No interest.
continue
if uuid == INVALID_UUID:
# No interest.
continue
if app.uuid == uuid:
# This looks like me...
if state == RUNNING_STATE:
# Yes, I know it.
continue
else:
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
if node is None:
# I really don't know such a node. What is this?
continue
else:
if node.getServer() != addr:
# This is different from what I know.
continue
if node.getState() == state:
# No change. Don't care.
continue
if state == RUNNING_STATE:
# No problem.
continue
# Something wrong happened possibly. Cut the connection to this node,
# if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
node.setState(state)
app.broadcastNodeInformation(node)
if isinstance(node, StorageNode) and state in (DOWN_STATE, BROKEN_STATE):
cell_list = app.pt.dropNode(node)
if len(cell_list) != 0:
ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, StorageNode):
self.handleUnexpectedPacket(conn, packet)
return
# If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.lptid < lptid:
logging.critical('got later information in service')
raise OperationFailure
def handleAskNewTID(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, ClientNode):
self.handleUnexpectedPacket(conn, packet)
return
tid = app.getNextTID()
conn.addPacket(Packet().answerNewTID(packet.getId(), tid))
def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, ClientNode):
self.handleUnexpectedPacket(conn, packet)
return
# If the given transaction ID is later than the last TID, the peer is crazy.
if app.ltid < tid:
self.handleUnexpectedPacket(conn, packet)
return
# Collect partitions related to this transaction.
getPartition = app.getPartition
partition_set = set()
partition_set.add(getPartition(tid))
partition_set.update([getPartition(oid) for oid in oid_list])
# Collect the UUIDs of nodes related to this transaction.
uuid_set = set()
for part in partition_set:
uuid_set.update([cell.getUUID() for cell in app.pt.getCellList(part)])
# Request locking data.
for c in app.em.getConnectionList():
if c.getUUID() in uuid_set:
msg_id = c.getNextId()
c.addPacket(Packet().lockInformation(msg_id, tid))
c.expectMessage(msg_id)
t = FinishingTransaction(conn, packet, oid_list, uuid_set)
app.finishing_transaction_dict[tid] = t
def handleNotifyTransactionLocked(self, conn, packet, tid):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, StorageNode):
self.handleUnexpectedPacket(conn, packet)
return
# If the given transaction ID is later than the last TID, the peer is crazy.
if app.ltid < tid:
self.handleUnexpectedPacket(conn, packet)
return
try:
t = app.finishing_transaction_dict[tid]
t.addLockedUUID(uuid)
if t.allLocked():
# I have received all the answers now. So send a Notify Transaction
# Finished to the initiated client node, Invalidate Objects to
# the other client nodes, and Unlock Information to relevant storage
# nodes.
p = Packet()
for c in app.em.getConnectionList():
uuid = c.getUUID()
if uuid is not None:
node = app.nm.getNodeByUUID()
if isinstance(node, ClientNode):
if c is t.getConnection():
p.notifyTransactionFinished(t.getMessageId(), tid)
c.addPacket(p)
else:
p.invalidateObjects(c.getNextId(), t.getOidList())
c.addPacket(p)
elif isinstance(node, StorageNode):
if uuid in t.getUUIDSet():
p.unlockInformation(c.getNextId(), tid)
c.addPacket(p)
del app.finishing_transaction_dict[tid]
except KeyError:
# What is this?
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment