Commit 137c989a authored by Aurel's avatar Aurel

make master node take into account the admin node


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@434 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2e6c8705
...@@ -25,7 +25,7 @@ from neo.protocol import Packet, \ ...@@ -25,7 +25,7 @@ from neo.protocol import Packet, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \ INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \ CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
UUID_NAMESPACES UUID_NAMESPACES, ADMIN_NODE_TYPE
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection, ServerConnection from neo.connection import ListeningConnection, ClientConnection, ServerConnection
...@@ -292,7 +292,7 @@ class Application(object): ...@@ -292,7 +292,7 @@ class Application(object):
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID()) n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE): if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
p = Packet() p = Packet()
node_list = [(node_type, ip_address, port, uuid, state)] node_list = [(node_type, ip_address, port, uuid, state)]
p.notifyNodeInformation(c.getNextId(), node_list) p.notifyNodeInformation(c.getNextId(), node_list)
...@@ -304,7 +304,7 @@ class Application(object): ...@@ -304,7 +304,7 @@ class Application(object):
node_list = [(node_type, ip_address, port, uuid, state)] node_list = [(node_type, ip_address, port, uuid, state)]
p.notifyNodeInformation(c.getNextId(), node_list) p.notifyNodeInformation(c.getNextId(), node_list)
c.addPacket(p) c.addPacket(p)
else: elif node.getNodeType() != ADMIN_NODE_TYPE:
raise RuntimeError('unknown node type') raise RuntimeError('unknown node type')
def broadcastPartitionChanges(self, ptid, cell_list): def broadcastPartitionChanges(self, ptid, cell_list):
...@@ -313,7 +313,7 @@ class Application(object): ...@@ -313,7 +313,7 @@ class Application(object):
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID()) n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (CLIENT_NODE_TYPE, STORAGE_NODE_TYPE): if n.getNodeType() in (CLIENT_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
# Split the packet if too big. # Split the packet if too big.
size = len(cell_list) size = len(cell_list)
start = 0 start = 0
...@@ -350,7 +350,7 @@ class Application(object): ...@@ -350,7 +350,7 @@ class Application(object):
if self.lptid is not None: if self.lptid is not None:
# I need to retrieve last ids again. # I need to retrieve last ids again.
logging.debug('resending Ask Last IDs') logging.info('resending Ask Last IDs')
for conn in em.getConnectionList(): for conn in em.getConnectionList():
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is not None: if uuid is not None:
...@@ -427,7 +427,7 @@ class Application(object): ...@@ -427,7 +427,7 @@ class Application(object):
# Wait until the cluster gets operational or the Partition # Wait until the cluster gets operational or the Partition
# Table ID turns out to be not the latest. # Table ID turns out to be not the latest.
logging.debug('waiting for the cluster to be operational') logging.info('waiting for the cluster to be operational')
self.pt.log() self.pt.log()
while 1: while 1:
em.poll(1) em.poll(1)
......
...@@ -19,11 +19,11 @@ import logging ...@@ -19,11 +19,11 @@ import logging
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.exception import ElectionFailure from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, INVALID_UUID, INVALID_PTID
from neo.node import ClientNode, StorageNode, MasterNode from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.util import dump from neo.util import dump
class RecoveryEventHandler(MasterEventHandler): class RecoveryEventHandler(MasterEventHandler):
...@@ -65,7 +65,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -65,7 +65,7 @@ class RecoveryEventHandler(MasterEventHandler):
def handleRequestNodeIdentification(self, conn, packet, node_type, uuid, def handleRequestNodeIdentification(self, conn, packet, node_type, uuid,
ip_address, port, name): ip_address, port, name):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(Packet().notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
...@@ -102,6 +102,8 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -102,6 +102,8 @@ class RecoveryEventHandler(MasterEventHandler):
# connected to me. # connected to me.
if node_type == MASTER_NODE_TYPE: if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid) node = MasterNode(server = addr, uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else: else:
node = StorageNode(server = addr, uuid = uuid) node = StorageNode(server = addr, uuid = uuid)
app.nm.add(node) app.nm.add(node)
...@@ -224,6 +226,22 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -224,6 +226,22 @@ class RecoveryEventHandler(MasterEventHandler):
p.askLastIDs(msg_id) p.askLastIDs(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
elif node.getNodeType() == ADMIN_NODE_TYPE and app.lptid not in (INVALID_PTID, None):
# send partition table if exists
logging.info('sending partition table %s to %s' %(app.lptid,
conn.getAddress()))
# Split the packet if too huge.
p = Packet()
row_list = []
for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p)
del row_list[:]
if len(row_list) != 0:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -245,7 +263,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -245,7 +263,7 @@ class RecoveryEventHandler(MasterEventHandler):
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type == CLIENT_NODE_TYPE: if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest. # No interest.
continue continue
...@@ -328,7 +346,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -328,7 +346,7 @@ class RecoveryEventHandler(MasterEventHandler):
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE: if node.getNodeType() not in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
return return
if uuid != app.target_uuid: if uuid != app.target_uuid:
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
import logging import logging
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
DOWN_STATE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure from neo.exception import ElectionFailure, PrimaryFailure
......
...@@ -20,11 +20,12 @@ from copy import copy ...@@ -20,11 +20,12 @@ from copy import copy
from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, STORAGE_NODE_TYPE UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, INVALID_UUID
from neo.exception import OperationFailure, ElectionFailure from neo.exception import OperationFailure, ElectionFailure
from neo.node import ClientNode, StorageNode, MasterNode from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.util import dump from neo.util import dump
class FinishingTransaction(object): class FinishingTransaction(object):
...@@ -83,6 +84,9 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -83,6 +84,9 @@ class ServiceEventHandler(MasterEventHandler):
for tid, t in app.finishing_transaction_dict.items(): for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn: if t.getConnection() is conn:
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
elif node.getNodeType() == ADMIN_NODE_TYPE:
# If this node is an admin , just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational(): if not app.pt.operational():
# Catastrophic. # Catastrophic.
...@@ -104,6 +108,9 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -104,6 +108,9 @@ class ServiceEventHandler(MasterEventHandler):
for tid, t in app.finishing_transaction_dict.items(): for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn: if t.getConnection() is conn:
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
elif node.getNodeType() == ADMIN_NODE_TYPE:
# If this node is an admin , just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational(): if not app.pt.operational():
# Catastrophic. # Catastrophic.
...@@ -125,6 +132,9 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -125,6 +132,9 @@ class ServiceEventHandler(MasterEventHandler):
for tid, t in app.finishing_transaction_dict.items(): for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn: if t.getConnection() is conn:
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
elif node.getNodeType() == ADMIN_NODE_TYPE:
# If this node is an admin , just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
cell_list = app.pt.dropNode(node) cell_list = app.pt.dropNode(node)
ptid = app.getNextPartitionTableID() ptid = app.getNextPartitionTableID()
...@@ -175,6 +185,8 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -175,6 +185,8 @@ class ServiceEventHandler(MasterEventHandler):
node = MasterNode(server = addr, uuid = uuid) node = MasterNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE: elif node_type == CLIENT_NODE_TYPE:
node = ClientNode(uuid = uuid) node = ClientNode(uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else: else:
node = StorageNode(server = addr, uuid = uuid) node = StorageNode(server = addr, uuid = uuid)
app.nm.add(node) app.nm.add(node)
...@@ -312,7 +324,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -312,7 +324,7 @@ class ServiceEventHandler(MasterEventHandler):
conn.addPacket(p) conn.addPacket(p)
# Send the information. # Send the information.
logging.debug('sending notify node information to %s:%d', logging.info('sending notify node information to %s:%d',
*(conn.getAddress())) *(conn.getAddress()))
node_list = [] node_list = []
for n in app.nm.getNodeList(): for n in app.nm.getNodeList():
...@@ -332,10 +344,10 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -332,10 +344,10 @@ class ServiceEventHandler(MasterEventHandler):
p.notifyNodeInformation(conn.getNextId(), node_list) p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
# If this is a storage node or a client node, send the partition table. # If this is a storage node or a client node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
logging.debug('sending send partition table to %s:%d', logging.info('sending partition table to %s:%d',
*(conn.getAddress())) *(conn.getAddress()))
# Split the packet if too huge. # Split the packet if too huge.
p = Packet() p = Packet()
...@@ -374,7 +386,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -374,7 +386,7 @@ class ServiceEventHandler(MasterEventHandler):
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type == CLIENT_NODE_TYPE: if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest. # No interest.
continue continue
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
import logging import logging
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.exception import VerificationFailure, ElectionFailure from neo.exception import VerificationFailure, ElectionFailure
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, INVALID_UUID
...@@ -36,7 +37,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -36,7 +37,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# If this node is a client, just forget it. # If this node is a client, just forget it.
app.nm.remove(node) app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
...@@ -53,7 +54,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -53,7 +54,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# If this node is a client, just forget it. # If this node is a client, just forget it.
app.nm.remove(node) app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
...@@ -70,7 +71,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -70,7 +71,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getState() != BROKEN_STATE: if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE) node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# If this node is a client, just forget it. # If this node is a client, just forget it.
app.nm.remove(node) app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
...@@ -88,7 +89,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -88,7 +89,7 @@ class VerificationEventHandler(MasterEventHandler):
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(Packet().notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
...@@ -125,6 +126,8 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -125,6 +126,8 @@ class VerificationEventHandler(MasterEventHandler):
# connected to me. # connected to me.
if node_type == MASTER_NODE_TYPE: if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid) node = MasterNode(server = addr, uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else: else:
node = StorageNode(server = addr, uuid = uuid) node = StorageNode(server = addr, uuid = uuid)
app.nm.add(node) app.nm.add(node)
...@@ -238,9 +241,9 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -238,9 +241,9 @@ class VerificationEventHandler(MasterEventHandler):
p.notifyNodeInformation(conn.getNextId(), node_list) p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
# If this is a storage node, send the partition table. # If this is a storage node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
# Split the packet if too huge. # Split the packet if too huge.
p = Packet() p = Packet()
row_list = [] row_list = []
...@@ -274,7 +277,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -274,7 +277,7 @@ class VerificationEventHandler(MasterEventHandler):
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type == CLIENT_NODE_TYPE: if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest. # No interest.
continue continue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment