Commit f8317749 authored by Aurel's avatar Aurel

fix connection to storage node


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@95 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d9fee7a8
...@@ -38,15 +38,15 @@ class ConnectionManager(object): ...@@ -38,15 +38,15 @@ class ConnectionManager(object):
def _initNodeConnection(self, node): def _initNodeConnection(self, node):
"""Init a connection to a given storage node.""" """Init a connection to a given storage node."""
addr = node.getServer() addr = node.getNode().getServer()
handler = ClientEventHandler(self.storage) handler = ClientEventHandler(self.storage, self.storage.dispatcher)
conn = ClientConnection(self.storage.em, handler, addr) conn = ClientConnection(self.storage.em, handler, addr)
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid, addr[0], p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.storage.uuid, addr[0],
addr[1], self.storage.name) addr[1], self.storage.name)
self.storage.local_var.tmp_q = Queue(1) self.storage.local_var.tmp_q = Queue(1)
self.storage.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.storage.queue.put((self.storage.local_var.tmp_q, msg_id, conn, p), True)
self.storage.local_var.storage_node = None self.storage.local_var.storage_node = None
self.storage._waitMessage() self.storage._waitMessage()
if self.storage.storage_node == -1: if self.storage.storage_node == -1:
...@@ -82,7 +82,7 @@ class ConnectionManager(object): ...@@ -82,7 +82,7 @@ class ConnectionManager(object):
if conn is None: if conn is None:
return None return None
# add node to node manager # add node to node manager
if not self.storage.nm.hasNode(node): if self.storage.nm.getNodeByServer(node.getServer()) is None:
n = StorageNode(node.getServer()) n = StorageNode(node.getServer())
self.storage.nm.add(n) self.storage.nm.add(n)
self.connection_dict[node.getUUID()] = conn self.connection_dict[node.getUUID()] = conn
...@@ -306,7 +306,7 @@ class Application(ThreadingMixIn, object): ...@@ -306,7 +306,7 @@ class Application(ThreadingMixIn, object):
# Store data on each node # Store data on each node
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -425,7 +425,7 @@ class Application(ThreadingMixIn, object): ...@@ -425,7 +425,7 @@ class Application(ThreadingMixIn, object):
compressed_data = compress(ddata) compressed_data = compress(ddata)
checksum = adler32(compressed_data) checksum = adler32(compressed_data)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -463,7 +463,7 @@ class Application(ThreadingMixIn, object): ...@@ -463,7 +463,7 @@ class Application(ThreadingMixIn, object):
partition_id = self.tid % self.num_partitions partition_id = self.tid % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -498,7 +498,7 @@ class Application(ThreadingMixIn, object): ...@@ -498,7 +498,7 @@ class Application(ThreadingMixIn, object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -512,7 +512,7 @@ class Application(ThreadingMixIn, object): ...@@ -512,7 +512,7 @@ class Application(ThreadingMixIn, object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -563,7 +563,7 @@ class Application(ThreadingMixIn, object): ...@@ -563,7 +563,7 @@ class Application(ThreadingMixIn, object):
partition_id = transaction_id % self.num_partitions partition_id = transaction_id % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -632,7 +632,7 @@ class Application(ThreadingMixIn, object): ...@@ -632,7 +632,7 @@ class Application(ThreadingMixIn, object):
self.local_var.node_tids = {} self.local_var.node_tids = {}
self.local_var.tmp_q = Queue(len(storage_node_list)) self.local_var.tmp_q = Queue(len(storage_node_list))
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -659,7 +659,7 @@ class Application(ThreadingMixIn, object): ...@@ -659,7 +659,7 @@ class Application(ThreadingMixIn, object):
partition_id = tid % self.num_partitions partition_id = tid % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -697,7 +697,7 @@ class Application(ThreadingMixIn, object): ...@@ -697,7 +697,7 @@ class Application(ThreadingMixIn, object):
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \ storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE] if x.getState() == UP_TO_DATE_STATE]
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -725,7 +725,7 @@ class Application(ThreadingMixIn, object): ...@@ -725,7 +725,7 @@ class Application(ThreadingMixIn, object):
partition_id = serial % self.num_partitions partition_id = serial % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment