Commit cb80604f authored by Aurel's avatar Aurel

fix some handler, and use queue when receiving message


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@84 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 1c9232ca
...@@ -24,7 +24,7 @@ class ClientEventHandler(EventHandler): ...@@ -24,7 +24,7 @@ class ClientEventHandler(EventHandler):
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread.""" """Redirect all received packet to dispatcher thread."""
self.dispatcher.message = conn, packet self.dispatcher.message.put((conn, packet), True)
def connectionFailed(self, conn): def connectionFailed(self, conn):
app = self.app app = self.app
...@@ -44,7 +44,7 @@ class ClientEventHandler(EventHandler): ...@@ -44,7 +44,7 @@ class ClientEventHandler(EventHandler):
uuid = conn.getUUID() uuid = conn.getUUID()
if self.app.primary_master_node is None: if self.app.primary_master_node is None:
EventHandler.connectionClosed(self, conn) EventHandler.connectionClosed(self, conn)
if uuid == self.app.primary_master_node.getUUID(): elif uuid == self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node closed") logging.critical("connection to primary master node closed")
raise NEOStorageError("connection to primary master node closed") raise NEOStorageError("connection to primary master node closed")
else: else:
...@@ -209,6 +209,8 @@ class ClientEventHandler(EventHandler): ...@@ -209,6 +209,8 @@ class ClientEventHandler(EventHandler):
return return
app = self.app app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node # This must be sent only by primary master node
if not isinstance(node, MasterNode) \ if not isinstance(node, MasterNode) \
...@@ -216,10 +218,18 @@ class ClientEventHandler(EventHandler): ...@@ -216,10 +218,18 @@ class ClientEventHandler(EventHandler):
or app.primary_master_node.getUUID() != uuid: or app.primary_master_node.getUUID() != uuid:
return return
# FIXME this part requires a serious fix. Look at if app.ptid != ptid:
# neo/storage/verification.py for details. app.ptid = ptid
for offset, node in row_list: pt.clear()
app.pt.setRow(offset, row) for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -241,40 +251,38 @@ class ClientEventHandler(EventHandler): ...@@ -241,40 +251,38 @@ class ClientEventHandler(EventHandler):
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes. # Register new nodes.
addr = (ip_address, port) addr = (ip_address, port)
if app.server == addr: n = app.nm.getNodeByServer(addr)
# This is self. if n is None:
continue if node_type == MASTER_NODE_TYPE:
else: n = MasterNode(server = addr)
n = app.app.nm.getNodeByServer(addr) if uuid != INVALID_UUID:
if n is None: # If I don't know the UUID yet, believe what the peer
if node_type == MASTER_NODE_TYPE: # told me at the moment.
n = MasterNode(server = addr) if n.getUUID() is None:
if uuid != INVALID_UUID: n.setUUID(uuid)
# If I don't know the UUID yet, believe what the peer elif node_type == STORAGE_NODE_TYPE:
# told me at the moment. if uuid == INVALID_UUID:
if n.getUUID() is None: # No interest.
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = StorageNode(server = addr)
elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = ClientNode(server = addr)
else:
continue continue
app.app.nm.add(n) n = StorageNode(server = addr)
elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = ClientNode(server = addr)
else:
continue
app.nm.add(n)
n.setState(state) n.setState(state)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
app = self.app app = self.app
nm = app.nm
pt = app.pt
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None: if uuid is None:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -288,10 +296,20 @@ class ClientEventHandler(EventHandler): ...@@ -288,10 +296,20 @@ class ClientEventHandler(EventHandler):
or app.primary_master_node.getUUID() != uuid: or app.primary_master_node.getUUID() != uuid:
return return
# FIXME this part requires a serious fix. Look at if app.ptid >= ptid:
# neo/storage/verification.py for details. # Ignore this packet.
for cell in cell_list: return
app.pt.addNode(cell)
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment