Commit 1dc37843 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Make client node to wait for a partition table to be operational, to fail in...

Make client node to wait for a partition table to be operational, to fail in making a connection when a node does not have a server address. Patch NEOStorage for returning a bogus value for __len__.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@144 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b2b0eabb
...@@ -188,3 +188,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -188,3 +188,7 @@ class NEOStorage(BaseStorage.BaseStorage,
def copyTransactionsFrom(self, other, verbose=0): def copyTransactionsFrom(self, other, verbose=0):
raise NotImplementedError raise NotImplementedError
def __len__(self):
# XXX bogus but how to implement this?
return 0
...@@ -40,6 +40,8 @@ class ConnectionManager(object): ...@@ -40,6 +40,8 @@ class ConnectionManager(object):
def _initNodeConnection(self, node): def _initNodeConnection(self, node):
"""Init a connection to a given storage node.""" """Init a connection to a given storage node."""
addr = node.getNode().getServer() addr = node.getNode().getServer()
if addr is None:
return None
handler = ClientEventHandler(self.storage, self.storage.dispatcher) handler = ClientEventHandler(self.storage, self.storage.dispatcher)
conn = ClientConnection(self.storage.em, handler, addr) conn = ClientConnection(self.storage.em, handler, addr)
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -252,7 +254,7 @@ class Application(ThreadingMixIn, object): ...@@ -252,7 +254,7 @@ class Application(ThreadingMixIn, object):
# Wait for primary master node information # Wait for primary master node information
while 1: while 1:
self._waitMessage(block=0) self._waitMessage(block=0)
if self.pt.filled() or self.node_not_ready: if self.pt.operational() or self.node_not_ready:
break break
......
...@@ -4,7 +4,7 @@ from neo.handler import EventHandler ...@@ -4,7 +4,7 @@ from neo.handler import EventHandler
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.protocol import Packet, \ from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, TEMPORARILY_DOWN_STATE, BROKEN_STATE INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, BROKEN_STATE
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.client.NEOStorage import NEOStorageError from neo.client.NEOStorage import NEOStorageError
...@@ -228,8 +228,7 @@ class ClientEventHandler(EventHandler): ...@@ -228,8 +228,7 @@ class ClientEventHandler(EventHandler):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is None: if node is None:
node = StorageNode(uuid = uuid) node = StorageNode(uuid = uuid)
if uuid != app.uuid: node.setState(TEMPORARILY_DOWN_STATE)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node) nm.add(node)
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
else: else:
...@@ -254,30 +253,40 @@ class ClientEventHandler(EventHandler): ...@@ -254,30 +253,40 @@ class ClientEventHandler(EventHandler):
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes. # Register new nodes.
addr = (ip_address, port) addr = (ip_address, port)
n = nm.getNodeByServer(addr)
if n is None: if node_type == MASTER_NODE_TYPE:
if node_type == MASTER_NODE_TYPE: n = nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr) n = MasterNode(server = addr)
nm.add(n) nm.add(n)
if uuid != INVALID_UUID: if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer # If I don't know the UUID yet, believe what the peer
# told me at the moment. # told me at the moment.
if n.getUUID() is None: if n.getUUID() is None:
n.setUUID(uuid) n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE: elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID: if uuid == INVALID_UUID:
# No interest. # No interest.
continue continue
n = nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(server = addr, uuid = uuid) n = StorageNode(server = addr, uuid = uuid)
nm.add(n) nm.add(n)
elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = ClientNode(server = addr, uuid = uuid)
nm.add(n)
else: else:
n.setServer(addr)
elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue continue
if state == RUNNING_STATE:
n = app.nm.getNodeByUUID(uuid)
if n is None:
n = ClientNode(server = addr, uuid = uuid)
nm.add(n)
else:
n = app.nm.getNodeByUUID(uuid)
if n is not None:
app.nm.remove(n)
n.setState(state) n.setState(state)
else: else:
......
...@@ -262,6 +262,8 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -262,6 +262,8 @@ class ServiceEventHandler(MasterEventHandler):
# If this is a storage node or a client node, send the partition table. # If this is a storage node or a client node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if isinstance(node, (StorageNode, ClientNode)): if isinstance(node, (StorageNode, ClientNode)):
logging.debug('sending send partition table to %s:%d',
*(conn.getAddress()))
# Split the packet if too huge. # Split the packet if too huge.
p = Packet() p = Packet()
row_list = [] row_list = []
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment