Commit f532bbbc authored by Grégory Wisniewski's avatar Grégory Wisniewski

Client's storage handler use connectionLost hook.

Storage node register unknown storage node found in the database before loading
the content into the partition table. So an update() can now expect to known the
storage node.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1092 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 82c18e80
...@@ -36,33 +36,27 @@ class StorageEventHandler(BaseHandler): ...@@ -36,33 +36,27 @@ class StorageEventHandler(BaseHandler):
queue = self.dispatcher.message_table.pop(key) queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue) queue_set.add(queue)
# Storage failure is notified to the primary master when the fake # Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread. # packet is popped by a non-polling thread.
for queue in queue_set: for queue in queue_set:
queue.put((conn, None)) queue.put((conn, None))
def handleConnectionLost(self, conn, new_state):
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress()) node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node) self._dealWithStorageFailure(conn, node)
super(StorageEventHandler, self).connectionClosed(conn) super(StorageEventHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageEventHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node)
super(StorageEventHandler, self).peerBroken(conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
# XXX: a connection failure is not like a connection lost, we should not
# have to clear the dispatcher because the connection was never
# established and so, no packet should have been send and thus, nothing
# must be expected. This should be well done if the first packet sent is
# done after the connectionCompleted event or a packet received.
# Connection to a storage node failed # Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress()) node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node) self._dealWithStorageFailure(conn, node)
super(StorageEventHandler, self).connectionFailed(conn) super(StorageEventHandler, self).connectionFailed(conn)
class StorageBootstrapHandler(AnswerBaseHandler): class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """ """ Handler used when connecting to a storage node """
......
...@@ -192,13 +192,7 @@ class PartitionTable(object): ...@@ -192,13 +192,7 @@ class PartitionTable(object):
self.id = ptid self.id = ptid
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is None: assert node is not None
logging.warning('Updating partition table with an unknown UUID : %s',
dump(uuid))
from neo.node import StorageNode
node = StorageNode(uuid=uuid)
node.setState(protocol.TEMPORARILY_DOWN_STATE)
nm.add(node)
self.setCell(offset, node, state) self.setCell(offset, node, state)
logging.debug('partition table updated') logging.debug('partition table updated')
self.log() self.log()
......
...@@ -113,12 +113,19 @@ class Application(object): ...@@ -113,12 +113,19 @@ class Application(object):
"""Load a partition table from the database.""" """Load a partition table from the database."""
ptid = self.dm.getPTID() ptid = self.dm.getPTID()
cell_list = self.dm.getPartitionTable() cell_list = self.dm.getPartitionTable()
# dirty, but have to convert states from int to Enum new_cell_list = []
convert_states = lambda (offset, uuid, state): (offset, uuid, for offset, uuid, state in cell_list:
protocol.partition_cell_states[state]) # convert from int to Enum
cell_list = map(convert_states, cell_list) state = protocol.partition_cell_states[state]
# register unknown nodes
if self.nm.getNodeByUUID(uuid) is None:
node = StorageNode(uuid=uuid)
node.setState(protocol.TEMPORARILY_DOWN_STATE)
self.nm.add(node)
new_cell_list.append((offset, uuid, state))
# load the partition table in manager
self.pt.clear() self.pt.clear()
self.pt.update(ptid, cell_list, self.nm) self.pt.update(ptid, new_cell_list, self.nm)
def run(self): def run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment