Commit 9483f891 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Forgot to implement a secondary master. Also, add a comment about expiration...

Forgot to implement a secondary master. Also, add a comment about expiration of storage nodes, which must be fixed afterwards.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@37 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f96c2801
...@@ -3,7 +3,7 @@ from MySQLdb import OperationalError ...@@ -3,7 +3,7 @@ from MySQLdb import OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
import logging import logging
class DatabaseManager: class DatabaseManager(object):
"""This class manages a database. """This class manages a database.
For now, this implementation is limited to MySQL.""" For now, this implementation is limited to MySQL."""
......
import logging import logging
import MySQLdb
import os import os
from socket import inet_aton from socket import inet_aton
from time import time, gmtime from time import time, gmtime
...@@ -19,6 +18,7 @@ from neo.master.election import ElectionEventHandler ...@@ -19,6 +18,7 @@ from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler from neo.master.verification import VerificationEventHandler
from neo.master.service import ServiceEventHandler from neo.master.service import ServiceEventHandler
from neo.master.secondary import SecondaryEventHandler
from neo.pt import PartitionTable from neo.pt import PartitionTable
class Application(object): class Application(object):
...@@ -628,6 +628,10 @@ class Application(object): ...@@ -628,6 +628,10 @@ class Application(object):
while 1: while 1:
try: try:
em.poll(1) em.poll(1)
# FIXME implement an expiration of temporary down nodes.
# If a temporary down storage node is expired, it moves to
# down state, and the partition table must drop the node,
# thus repartitioning must be performed.
except OperationFailure: except OperationFailure:
# If not operational, send Stop Operation packets to storage nodes # If not operational, send Stop Operation packets to storage nodes
# and client nodes. Abort connections to client nodes. # and client nodes. Abort connections to client nodes.
...@@ -666,30 +670,36 @@ class Application(object): ...@@ -666,30 +670,36 @@ class Application(object):
self.provideService() self.provideService()
def playSecondaryRole(self): def playSecondaryRole(self):
"""I play a secondary role, thus only wait for a primary master to fail."""
logging.info('play the secondary role') logging.info('play the secondary role')
raise NotImplementedError
handler = SecondaryEventHandler()
em = self.em
nm = self.nm
# Make sure that every connection has the secondary event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
while 1:
em.poll(1)
def getNextPartitionTableID(self): def getNextPartitionTableID(self):
if self.lptid is None: if self.lptid is None:
raise RuntimeError, 'I do not know the last Partition Table ID' raise RuntimeError, 'I do not know the last Partition Table ID'
l = [] ptid = unpack('!Q', self.lptid)[0]
append = l.append self.lptid = pack('!Q', ptid + 1)
for c in self.lptid:
append(c)
for i in xrange(7, -1, -1):
d = ord(l[i])
if d == 255:
l[i] = chr(0)
else:
l[i] = chr(d + 1)
break
else:
raise RuntimeError, 'Partition Table ID overflowed'
self.lptid = ''.join(l)
return self.lptid return self.lptid
def getNextOID(self):
if self.loid is None:
raise RuntimeError, 'I do not know the last OID'
oid = unpack('!Q', self.loid)[0]
self.loid = pack('!Q', oid + 1)
return self.loid
def getNextTID(self): def getNextTID(self):
tm = time() tm = time()
gmt = gmtime(tm) gmt = gmtime(tm)
......
import logging
from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE
from neo.master.handler import MasterEventHandler
from neo.connection import ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure
from neo.protocol import Packet, INVALID_UUID
from neo.util import dump
class SecondaryEventHandler(MasterEventHandler):
"""This class deals with events for a secondary master."""
def connectionClosed(self, conn):
if isinstance(conn, ClientConnection):
self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is dead'
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
if isinstance(conn, ClientConnection):
self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is down'
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
if isinstance(conn, ClientConnection):
self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is crazy'
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
if isinstance(conn, ClientConnection):
node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
app = self.app
if name != app.name:
logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
return
# Add a node only if it is a master node and I do not know it yet.
if node_type == MASTER_NODE_TYPE:
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
def handleAskPrimaryMaster(self, conn, packet):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
primary_uuid = app.primary_master_node.getUUID()
known_master_list = []
for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE:
continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = Packet()
p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type != MASTER_NODE:
# No interest.
continue
# Register new master nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment