Commit 523675ad authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use master's transaction manager and setup some TODO/XXX.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1453 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 004139b0
......@@ -32,6 +32,7 @@ from neo.master.handlers import election, identification, secondary, recovery
from neo.master.handlers import verification, storage, client, shutdown
from neo.master.handlers import administration
from neo.master.pt import PartitionTable
from neo.master.transactions import TransactionManager
from neo.util import dump, parseMasterList
from neo.connector import getConnectorHandler
......@@ -55,6 +56,7 @@ class Application(object):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.tm = TransactionManager()
# Partition table
replicas, partitions = config.getReplicas(), config.getPartitions()
......@@ -95,9 +97,6 @@ class Application(object):
self.asking_uuid_dict = {}
self.object_present = False
# service related data
self.finishing_transaction_dict = {}
def run(self):
"""Make sure that the status is sane and start a loop."""
......@@ -551,13 +550,10 @@ class Application(object):
logging.info('provide service')
em = self.em
nm = self.nm
self.tm.reset()
self.changeClusterState(ClusterStates.RUNNING)
# This dictionary is used to hold information on transactions being
# finished.
self.finishing_transaction_dict = {}
# Now everything is passive.
while True:
try:
......@@ -717,7 +713,7 @@ class Application(object):
# wait for all transaction to be finished
while 1:
self.em.poll(1)
if len(self.finishing_transaction_dict) == 0:
if not self.tm.hasPending():
if self.cluster_state == ClusterStates.RUNNING:
sys.exit("Application has been asked to shut down")
else:
......
......@@ -22,44 +22,6 @@ from neo.protocol import NodeStates, Packets, UnexpectedPacketError
from neo.master.handlers import BaseServiceHandler
from neo.util import dump, getNextTID
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn):
self._conn = conn
self._msg_id = None
self._oid_list = None
self._uuid_set = None
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def setMessageId(self, msg_id):
self._msg_id = msg_id
def getMessageId(self):
return self._msg_id
def setOIDList(self, oid_list):
self._oid_list = oid_list
def getOIDList(self):
return self._oid_list
def setUUIDSet(self, uuid_set):
self._uuid_set = uuid_set
def getUUIDSet(self):
return self._uuid_set
def addLockedUUID(self, uuid):
if uuid in self._uuid_set:
self._locked_uuid_set.add(uuid)
def allLocked(self):
return self._uuid_set == self._locked_uuid_set
class ClientServiceHandler(BaseServiceHandler):
""" Handler dedicated to client during service state """
......@@ -68,16 +30,14 @@ class ClientServiceHandler(BaseServiceHandler):
pass
def nodeLost(self, conn, node):
app = self.app
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
app.nm.remove(node)
# cancel it's transactions and forgot the node
self.app.tm.abortFor(node)
self.app.nm.remove(node)
def abortTransaction(self, conn, packet, tid):
try:
del self.app.finishing_transaction_dict[tid]
except KeyError:
if tid in self.app.tm:
self.app.tm.remove(tid)
else:
logging.warn('aborting transaction %s does not exist', dump(tid))
def askBeginTransaction(self, conn, packet, tid):
......@@ -88,8 +48,10 @@ class ClientServiceHandler(BaseServiceHandler):
if tid is None:
# give a new transaction ID
tid = getNextTID(app.ltid)
# TODO: transaction manager should handle last TID
app.ltid = tid
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
node = app.nm.getByUUID(conn.getUUID())
app.tm.begin(node, tid)
conn.answer(Packets.AnswerBeginTransaction(tid), packet.getId())
def askNewOIDs(self, conn, packet, num_oids):
......@@ -124,11 +86,5 @@ class ClientServiceHandler(BaseServiceHandler):
c.ask(Packets.LockInformation(tid), timeout=60)
used_uuid_set.add(c.getUUID())
try:
t = app.finishing_transaction_dict[tid]
t.setOIDList(oid_list)
t.setUUIDSet(used_uuid_set)
t.setMessageId(packet.getId())
except KeyError:
logging.warn('finishing transaction %s does not exist', dump(tid))
app.tm.prepare(tid, oid_list, used_uuid_set, packet.getId())
......@@ -48,9 +48,7 @@ class StorageServiceHandler(BaseServiceHandler):
packet.getId())
def askUnfinishedTransactions(self, conn, packet):
app = self.app
p = Packets.AnswerUnfinishedTransactions(
app.finishing_transaction_dict.keys())
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
conn.answer(p, packet.getId())
def notifyInformationLocked(self, conn, packet, tid):
......@@ -64,9 +62,12 @@ class StorageServiceHandler(BaseServiceHandler):
raise UnexpectedPacketError
try:
t = app.finishing_transaction_dict[tid]
t.addLockedUUID(uuid)
if t.allLocked():
t = self.app.tm[tid]
if t.lock(uuid): # all nodes are locked
# XXX: review needed:
# don't iterate over connections but search by uuid
# include client's uuid in Transaction object
# I have received all the answers now. So send a Notify
# Transaction Finished to the initiated client node,
# Invalidate Objects to the other client nodes, and Unlock
......@@ -76,7 +77,7 @@ class StorageServiceHandler(BaseServiceHandler):
if uuid is not None:
node = app.nm.getByUUID(uuid)
if node.isClient():
if c is t.getConnection():
if node is t.getNode():
p = Packets.NotifyTransactionFinished(tid)
c.answer(p, t.getMessageId())
else:
......@@ -84,10 +85,10 @@ class StorageServiceHandler(BaseServiceHandler):
tid)
c.notify(p)
elif node.isStorage():
if uuid in t.getUUIDSet():
if uuid in t.getUUIDList():
p = Packets.UnlockInformation(tid)
c.notify(p)
del app.finishing_transaction_dict[tid]
self.app.tm.remove(tid)
except KeyError:
# What is this?
pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment