Commit 2f46a9cc authored by Grégory Wisniewski's avatar Grégory Wisniewski

Factorize and clean master service.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@525 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 581204e5
...@@ -71,13 +71,14 @@ class FinishingTransaction(object): ...@@ -71,13 +71,14 @@ class FinishingTransaction(object):
class ServiceEventHandler(MasterEventHandler): class ServiceEventHandler(MasterEventHandler):
"""This class deals with events for a service phase.""" """This class deals with events for a service phase."""
def connectionClosed(self, conn): def _dealWithNodeFailure(self, conn, new_state):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is not None: if uuid is None:
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getState() == RUNNING_STATE: if node is not None and node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(new_state)
logging.debug('broadcasting node information') logging.debug('broadcasting node information')
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() == CLIENT_NODE_TYPE:
...@@ -93,30 +94,13 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -93,30 +94,13 @@ class ServiceEventHandler(MasterEventHandler):
if not app.pt.operational(): if not app.pt.operational():
# Catastrophic. # Catastrophic.
raise OperationFailure, 'cannot continue operation' raise OperationFailure, 'cannot continue operation'
def connectionClosed(self, conn):
self._dealWithNodeFailure(conn, TEMPORARILY_DOWN_STATE)
MasterEventHandler.connectionClosed(self, conn) MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
uuid = conn.getUUID() self._dealWithNodeFailure(conn, TEMPORARILY_DOWN_STATE)
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE:
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == ADMIN_NODE_TYPE:
# If this node is an admin , just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.timeoutExpired(self, conn) MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
...@@ -146,9 +130,6 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -146,9 +130,6 @@ class ServiceEventHandler(MasterEventHandler):
raise OperationFailure, 'cannot continue operation' raise OperationFailure, 'cannot continue operation'
MasterEventHandler.peerBroken(self, conn) MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
app = self.app app = self.app
...@@ -485,23 +466,20 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -485,23 +466,20 @@ class ServiceEventHandler(MasterEventHandler):
@restrict_node_types(CLIENT_NODE_TYPE) @restrict_node_types(CLIENT_NODE_TYPE)
def handleAbortTransaction(self, conn, packet, tid): def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app node = self.app.nm.getNodeByUUID(uuid)
node = app.nm.getNodeByUUID(uuid)
try: try:
del app.finishing_transaction_dict[tid] del self.app.finishing_transaction_dict[tid]
except KeyError: except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid)) logging.warn('aborting transaction %s does not exist', dump(tid))
pass pass
@identification_required @identification_required
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
uuid = conn.getUUID()
app = self.app app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.lptid), packet) conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.lptid), packet)
@identification_required @identification_required
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
uuid = conn.getUUID()
app = self.app app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys()) p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet) conn.answer(p, packet)
...@@ -553,3 +531,4 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -553,3 +531,4 @@ class ServiceEventHandler(MasterEventHandler):
if new_cell_list: if new_cell_list:
ptid = app.getNextPartitionTableID() ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, new_cell_list) app.broadcastPartitionChanges(ptid, new_cell_list)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment