Commit c791f6d1 authored by Aurel's avatar Aurel

comsetic changes


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@706 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent de2dfaa0
# #
# Copyright (C) 2006-2009 Nexedi SA # Copyright (C) 2006-2009 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2 # as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version. # of the License, or (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
...@@ -26,7 +26,7 @@ from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ ...@@ -26,7 +26,7 @@ from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_PTID, partition_cell_states, MASTER_NODE_TYPE INVALID_UUID, INVALID_PTID, partition_cell_states, MASTER_NODE_TYPE
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
from neo.exception import OperationFailure, PrimaryFailure from neo.exception import OperationFailure, PrimaryFailure
from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \ from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \
MasterBootstrapEventHandler, MasterRequestEventHandler, MasterEventHandler MasterBootstrapEventHandler, MasterRequestEventHandler, MasterEventHandler
...@@ -48,8 +48,8 @@ class Dispatcher: ...@@ -48,8 +48,8 @@ class Dispatcher:
return self.message_table.pop(msg_id, None) return self.message_table.pop(msg_id, None)
def registered(self, msg_id): def registered(self, msg_id):
return self.message_table.has_key(msg_id) return self.message_table.has_key(msg_id)
class Application(object): class Application(object):
"""The storage node application.""" """The storage node application."""
...@@ -81,7 +81,7 @@ class Application(object): ...@@ -81,7 +81,7 @@ class Application(object):
self.monitoring_handler = MasterMonitoringEventHandler(self) self.monitoring_handler = MasterMonitoringEventHandler(self)
self.request_handler = MasterRequestEventHandler(self) self.request_handler = MasterRequestEventHandler(self)
self.dispatcher = Dispatcher() self.dispatcher = Dispatcher()
def run(self): def run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
if self.num_partitions is not None and self.num_partitions <= 0: if self.num_partitions is not None and self.num_partitions <= 0:
...@@ -110,14 +110,14 @@ class Application(object): ...@@ -110,14 +110,14 @@ class Application(object):
# do not trust any longer our informations # do not trust any longer our informations
self.pt.clear() self.pt.clear()
self.nm.clear(filter = lambda node: node.getNodeType() != MASTER_NODE_TYPE) self.nm.clear(filter = lambda node: node.getNodeType() != MASTER_NODE_TYPE)
def connectToPrimaryMaster(self): def connectToPrimaryMaster(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat If a primary master node is not elected or ready, repeat
the attempt of a connection periodically. the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes Note that I do not accept any connection from non-master nodes
at this stage.""" at this stage."""
logging.info('connecting to a primary master node') logging.info('connecting to a primary master node')
...@@ -194,6 +194,5 @@ class Application(object): ...@@ -194,6 +194,5 @@ class Application(object):
p = protocot.protocolError('invalid partition table offset') p = protocot.protocolError('invalid partition table offset')
conn.notify(p) conn.notify(p)
return return
print "sending packet", len(row_list)
p = protocol.answerPartitionList(self.ptid, row_list) p = protocol.answerPartitionList(self.ptid, row_list)
conn.notify(p, msg_id) conn.notify(p, msg_id)
# #
# Copyright (C) 2009 Nexedi SA # Copyright (C) 2009 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2 # as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version. # of the License, or (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
...@@ -60,7 +60,7 @@ class AdminEventHandler(BaseEventHandler): ...@@ -60,7 +60,7 @@ class AdminEventHandler(BaseEventHandler):
'msg_id' : packet.getId()}) 'msg_id' : packet.getId()})
else: else:
app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId()) app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId())
def handleAskNodeList(self, conn, packet, node_type): def handleAskNodeList(self, conn, packet, node_type):
logging.info("ask node list for %s" %(node_type)) logging.info("ask node list for %s" %(node_type))
...@@ -95,7 +95,7 @@ class AdminEventHandler(BaseEventHandler): ...@@ -95,7 +95,7 @@ class AdminEventHandler(BaseEventHandler):
p = protocol.setNodeState(uuid, state, modify_partition_table) p = protocol.setNodeState(uuid, state, modify_partition_table)
msg_id = master_conn.ask(p) msg_id = master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
def handleSetClusterState(self, conn, packet, name, state): def handleSetClusterState(self, conn, packet, name, state):
self.checkClusterName(name) self.checkClusterName(name)
# forward to primary # forward to primary
...@@ -103,7 +103,7 @@ class AdminEventHandler(BaseEventHandler): ...@@ -103,7 +103,7 @@ class AdminEventHandler(BaseEventHandler):
p = protocol.setClusterState(name, state) p = protocol.setClusterState(name, state)
msg_id = master_conn.ask(p) msg_id = master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
def handleAddPendingNodes(self, conn, packet, uuid_list): def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list]) uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.info('Add nodes %s' % uuids) logging.info('Add nodes %s' % uuids)
...@@ -116,23 +116,23 @@ class AdminEventHandler(BaseEventHandler): ...@@ -116,23 +116,23 @@ class AdminEventHandler(BaseEventHandler):
class MasterEventHandler(BaseEventHandler): class MasterEventHandler(BaseEventHandler):
""" This class is just used to dispacth message to right handler""" """ This class is just used to dispacth message to right handler"""
def dispatch(self, conn, packet): def dispatch(self, conn, packet):
if self.app.dispatcher.registered(packet.getId()): if self.app.dispatcher.registered(packet.getId()):
# answer to a request # answer to a request
self.app.request_handler.dispatch(conn, packet) self.app.request_handler.dispatch(conn, packet)
else: else:
# monitoring phase # monitoring phase
self.app.monitoring_handler.dispatch(conn, packet) self.app.monitoring_handler.dispatch(conn, packet)
class MasterBaseEventHandler(BaseEventHandler): class MasterBaseEventHandler(BaseEventHandler):
""" This is the base class for connection to primary master node""" """ This is the base class for connection to primary master node"""
def connectionAccepted(self, conn, s, addr): def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted.""" """Called when a connection is accepted."""
raise UnexpectedPacketError raise UnexpectedPacketError
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
if app.trying_master_node is None: if app.trying_master_node is None:
...@@ -143,7 +143,6 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -143,7 +143,6 @@ class MasterBaseEventHandler(BaseEventHandler):
conn.ask(protocol.askPrimaryMaster()) conn.ask(protocol.askPrimaryMaster())
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
app = self.app app = self.app
...@@ -163,7 +162,6 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -163,7 +162,6 @@ class MasterBaseEventHandler(BaseEventHandler):
EventHandler.connectionFailed(self, conn) EventHandler.connectionFailed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
app = self.app app = self.app
...@@ -173,12 +171,11 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -173,12 +171,11 @@ class MasterBaseEventHandler(BaseEventHandler):
if app.trying_master_node is app.primary_master_node: if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it. # If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None app.primary_master_node = None
app.trying_master_node = None app.trying_master_node = None
EventHandler.timeoutExpired(self, conn) EventHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
app = self.app app = self.app
...@@ -208,10 +205,8 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -208,10 +205,8 @@ class MasterBaseEventHandler(BaseEventHandler):
EventHandler.peerBroken(self, conn) EventHandler.peerBroken(self, conn)
@decorators.identification_required @decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
logging.info("handleNotifyNodeInformation")
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
nm = app.nm nm = app.nm
...@@ -240,7 +235,7 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -240,7 +235,7 @@ class MasterBaseEventHandler(BaseEventHandler):
# same uuid but different address, remove it # same uuid but different address, remove it
nm.remove(n) nm.remove(n)
n = None n = None
if node_type == MASTER_NODE_TYPE: if node_type == MASTER_NODE_TYPE:
if n is None: if n is None:
n = MasterNode(server = addr) n = MasterNode(server = addr)
...@@ -257,7 +252,7 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -257,7 +252,7 @@ class MasterBaseEventHandler(BaseEventHandler):
# No interest. # No interest.
continue continue
if n is None: if n is None:
if node_type == STORAGE_NODE_TYPE: if node_type == STORAGE_NODE_TYPE:
n = StorageNode(server = addr, uuid = uuid) n = StorageNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE: elif node_type == CLIENT_NODE_TYPE:
n = ClientNode(server = addr, uuid = uuid) n = ClientNode(server = addr, uuid = uuid)
...@@ -267,9 +262,9 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -267,9 +262,9 @@ class MasterBaseEventHandler(BaseEventHandler):
else: else:
logging.warning("unknown node type %s" %(node_type)) logging.warning("unknown node type %s" %(node_type))
continue continue
n.setState(state) n.setState(state)
self.app.notified = True self.app.notified = True
...@@ -285,7 +280,7 @@ class MasterRequestEventHandler(MasterBaseEventHandler): ...@@ -285,7 +280,7 @@ class MasterRequestEventHandler(MasterBaseEventHandler):
logging.info("handleAnswerNewNodes for a conn") logging.info("handleAnswerNewNodes for a conn")
client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
client_conn.notify(protocol.answerNewNodes(uuid_list), kw['msg_id']) client_conn.notify(protocol.answerNewNodes(uuid_list), kw['msg_id'])
@decorators.identification_required @decorators.identification_required
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
logging.info("handleAnswerPartitionTable for a conn") logging.info("handleAnswerPartitionTable for a conn")
...@@ -293,27 +288,27 @@ class MasterRequestEventHandler(MasterBaseEventHandler): ...@@ -293,27 +288,27 @@ class MasterRequestEventHandler(MasterBaseEventHandler):
# sent client the partition table # sent client the partition table
self.app.sendPartitionTable(client_conn, **kw) self.app.sendPartitionTable(client_conn, **kw)
def handleAnswerNodeState(self, conn, packet, uuid, state): def handleAnswerNodeState(self, conn, packet, uuid, state):
client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
p = protocol.answerNodeState(uuid, state) p = protocol.answerNodeState(uuid, state)
client_conn.notify(p, kw['msg_id']) client_conn.notify(p, kw['msg_id'])
def handleNoError(self, conn, packet, msg): def handleNoError(self, conn, packet, msg):
client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
p = protocol.noError(msg) p = protocol.noError(msg)
client_conn.notify(p, kw['msg_id']) client_conn.notify(p, kw['msg_id'])
class MasterBootstrapEventHandler(MasterBaseEventHandler): class MasterBootstrapEventHandler(MasterBaseEventHandler):
"""This class manage the bootstrap part to the primary master node""" """This class manage the bootstrap part to the primary master node"""
def handleNotReady(self, conn, packet, message): def handleNotReady(self, conn, packet, message):
app = self.app app = self.app
if app.trying_master_node is not None: if app.trying_master_node is not None:
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas, your_uuid): num_partitions, num_replicas, your_uuid):
...@@ -329,7 +324,7 @@ class MasterBootstrapEventHandler(MasterBaseEventHandler): ...@@ -329,7 +324,7 @@ class MasterBootstrapEventHandler(MasterBaseEventHandler):
# The server address is different! Then why was # The server address is different! Then why was
# the connection successful? # the connection successful?
logging.error('%s:%d is waiting for %s:%d', logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], conn.getAddress()[0], conn.getAddress()[1],
ip_address, port) ip_address, port)
app.nm.remove(node) app.nm.remove(node)
conn.close() conn.close()
...@@ -400,7 +395,7 @@ class MasterBootstrapEventHandler(MasterBaseEventHandler): ...@@ -400,7 +395,7 @@ class MasterBootstrapEventHandler(MasterBaseEventHandler):
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE, p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name) app.uuid, app.server[0], app.server[1], app.name)
conn.ask(p) conn.ask(p)
...@@ -418,7 +413,6 @@ class MasterMonitoringEventHandler(MasterBaseEventHandler): ...@@ -418,7 +413,6 @@ class MasterMonitoringEventHandler(MasterBaseEventHandler):
@decorators.identification_required @decorators.identification_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
logging.warning("handleNotifyPartitionChanges")
app = self.app app = self.app
nm = app.nm nm = app.nm
pt = app.pt pt = app.pt
...@@ -443,11 +437,10 @@ class MasterMonitoringEventHandler(MasterBaseEventHandler): ...@@ -443,11 +437,10 @@ class MasterMonitoringEventHandler(MasterBaseEventHandler):
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node) nm.add(node)
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
pt.log() pt.log()
@decorators.identification_required @decorators.identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
logging.warning("handleSendPartitionTable")
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
nm = app.nm nm = app.nm
...@@ -470,6 +463,3 @@ class MasterMonitoringEventHandler(MasterBaseEventHandler): ...@@ -470,6 +463,3 @@ class MasterMonitoringEventHandler(MasterBaseEventHandler):
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
pt.log() pt.log()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment