Commit 859606a5 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Handle aborting.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@181 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 98f186f7
......@@ -12,22 +12,31 @@ from neo.util import dump
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn, packet, oid_list, uuid_set):
def __init__(self, conn):
self._conn = conn
self._msg_id = packet.getId()
self._oid_list = oid_list
self._uuid_set = uuid_set
self._msg_id = None
self._oid_list = None
self._uuid_set = None
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def setMessageId(self, msg_id):
self._msg_id = msg_id
def getMessageId(self):
return self._msg_id
def setOIDList(self, oid_list):
self._oid_list = oid_list
def getOIDList(self):
return self._oid_list
def setUUIDSet(self, uuid_set):
self._uuid_set = uuid_set
def getUUIDSet(self):
return self._uuid_set
......@@ -53,6 +62,9 @@ class ServiceEventHandler(MasterEventHandler):
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif isinstance(node, StorageNode):
if not app.pt.operational():
# Catastrophic.
......@@ -71,6 +83,9 @@ class ServiceEventHandler(MasterEventHandler):
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif isinstance(node, StorageNode):
if not app.pt.operational():
# Catastrophic.
......@@ -89,6 +104,9 @@ class ServiceEventHandler(MasterEventHandler):
if isinstance(node, ClientNode):
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif isinstance(node, StorageNode):
cell_list = app.pt.dropNode(node)
ptid = app.getNextPartitionTableID()
......@@ -369,8 +387,8 @@ class ServiceEventHandler(MasterEventHandler):
# No problem.
continue
# Something wrong happened possibly. Cut the connection to this node,
# if any, and notify the information to others.
# Something wrong happened possibly. Cut the connection to
# this node, if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
......@@ -379,7 +397,8 @@ class ServiceEventHandler(MasterEventHandler):
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if isinstance(node, StorageNode) and state in (DOWN_STATE, BROKEN_STATE):
if isinstance(node, StorageNode) \
and state in (DOWN_STATE, BROKEN_STATE):
cell_list = app.pt.dropNode(node)
if len(cell_list) != 0:
ptid = app.getNextPartitionTableID()
......@@ -417,6 +436,7 @@ class ServiceEventHandler(MasterEventHandler):
return
tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.addPacket(Packet().answerNewTID(packet.getId(), tid))
def handleAskNewOIDs(self, conn, packet, num_oids):
......@@ -448,7 +468,8 @@ class ServiceEventHandler(MasterEventHandler):
self.handleUnexpectedPacket(conn, packet)
return
# If the given transaction ID is later than the last TID, the peer is crazy.
# If the given transaction ID is later than the last TID, the peer
# is crazy.
if app.ltid < tid:
self.handleUnexpectedPacket(conn, packet)
return
......@@ -457,12 +478,13 @@ class ServiceEventHandler(MasterEventHandler):
getPartition = app.getPartition
partition_set = set()
partition_set.add(getPartition(tid))
partition_set.update([getPartition(oid) for oid in oid_list])
partition_set.update((getPartition(oid) for oid in oid_list))
# Collect the UUIDs of nodes related to this transaction.
uuid_set = set()
for part in partition_set:
uuid_set.update([cell.getUUID() for cell in app.pt.getCellList(part)])
uuid_set.update((cell.getUUID() for cell \
in app.pt.getCellList(part)))
# Request locking data.
for c in app.em.getConnectionList():
......@@ -471,8 +493,14 @@ class ServiceEventHandler(MasterEventHandler):
c.addPacket(Packet().lockInformation(msg_id, tid))
c.expectMessage(msg_id, timeout = 60)
t = FinishingTransaction(conn, packet, oid_list, uuid_set)
app.finishing_transaction_dict[tid] = t
try:
t = app.finishing_transaction_dict[tid]
t.setOIDList(oid_list)
t.setUUIDSet(uuid_set)
t.setMessageId(packet.getId())
except KeyError:
logging.warn('finishing transaction %s does not exist', dump(tid))
pass
def handleNotifyInformationLocked(self, conn, packet, tid):
uuid = conn.getUUID()
......@@ -487,7 +515,8 @@ class ServiceEventHandler(MasterEventHandler):
self.handleUnexpectedPacket(conn, packet)
return
# If the given transaction ID is later than the last TID, the peer is crazy.
# If the given transaction ID is later than the last TID, the peer
# is crazy.
if app.ltid < tid:
self.handleUnexpectedPacket(conn, packet)
return
......@@ -496,10 +525,10 @@ class ServiceEventHandler(MasterEventHandler):
t = app.finishing_transaction_dict[tid]
t.addLockedUUID(uuid)
if t.allLocked():
# I have received all the answers now. So send a Notify Transaction
# Finished to the initiated client node, Invalidate Objects to
# the other client nodes, and Unlock Information to relevant storage
# nodes.
# I have received all the answers now. So send a Notify
# Transaction Finished to the initiated client node,
# Invalidate Objects to the other client nodes, and Unlock
# Information to relevant storage nodes.
p = Packet()
for c in app.em.getConnectionList():
uuid = c.getUUID()
......@@ -522,3 +551,22 @@ class ServiceEventHandler(MasterEventHandler):
except KeyError:
# What is this?
pass
def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, ClientNode):
self.handleUnexpectedPacket(conn, packet)
return
try:
app.finishing_transaction_dict[tid]
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment