Commit 004139b0 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add a transaction manager for the master node and tests.

The manager kept track of pending (unfinished) transactions initiated by the client nodes and locked by storage nodes.

git-svn-id: 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5dad376a
# Copyright (C) 2006-2010 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
class Transaction(object):
A pending transaction
def __init__(self, node, tid):
self._node = node
self._tid = tid
self._oid_list = []
self._msg_id = None
# uuid dict hold flag to known who has locked the transaction
self._uuid_dict = {}
def getNode(self):
Return the node that had began the transaction
return self._node
def getTID(self):
Return the transaction ID
return self._tid
def getMessageId(self):
Returns the packet ID to use in the answer
return self._msg_id
def getUUIDList(self):
Returns the list of node's UUID that lock the transaction
return self._uuid_dict.keys()
def getOIDList(self):
Returns the list of OIDs used in the transaction
return list(self._oid_list)
def prepare(self, oid_list, uuid_list, msg_id):
Prepare the transaction, set OIDs and UUIDs related to it
assert not self._oid_list
assert not self._uuid_dict
self._oid_list = oid_list
self._uuid_dict = dict.fromkeys(uuid_list, False)
self._msg_id = msg_id
def lock(self, uuid):
Define that a node has locked the transaction
Returns true if all nodes are locked
self._uuid_dict[uuid] = True
return self.locked()
def locked(self):
Returns true if all nodes are locked
return False not in self._uuid_dict.values()
class TransactionManager(object):
Manage current transactions
def __init__(self):
# tid -> transaction
self._tid_dict = {}
# node -> transactions mapping
self._node_dict = {}
def __getitem__(self, tid):
Return the transaction object for this TID
return self._tid_dict[tid]
def __contains__(self, tid):
Returns True if this is a pending transaction
return tid in self._tid_dict
def reset(self):
Discard all manager content
self._tid_dict = {}
self._node_dict = {}
def hasPending(self):
Returns True if some transactions are pending
return bool(self._tid_dict)
def getPendingList(self):
Return the list of pending transaction IDs
return self._tid_dict.keys()
# TODO: manager should generate the tid itself
def begin(self, node, tid):
Begin a new transaction
assert node is not None
txn = Transaction(node, tid)
self._tid_dict[tid] = txn
# XXX: check the data structure
self._node_dict.setdefault(node, {})[tid] = txn
def prepare(self, tid, oid_list, uuid_list, msg_id):
Prepare a transaction to be finished
assert tid in self._tid_dict, "Transaction not started"
txn = self._tid_dict[tid]
txn.prepare(oid_list, uuid_list, msg_id)
def remove(self, tid):
Remove a transaction, commited or aborted
if tid not in self._tid_dict:
node = self._tid_dict[tid].getNode()
# remove both mappings, node will be removed in abortFor
del self._tid_dict[tid]
del self._node_dict[node][tid]
def lock(self, tid, uuid):
Set that a node has locked the transaction.
Returns True if all are now locked
assert tid in self._tid_dict, "Transaction not started"
return self._tid_dict[tid].lock(uuid)
def abortFor(self, node):
Abort pending transactions initiated by a node
# nothing to do
if node not in self._node_dict:
# remove transactions
for tid in self._node_dict[node].keys():
del self._tid_dict[tid]
# discard node entry
del self._node_dict[node]
# Copyright (C) 2006-2010 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import unittest
from mock import Mock
from struct import pack
from neo.tests import NeoTestBase
from neo.master.transactions import Transaction, TransactionManager
class testTransactionManager(NeoTestBase):
def makeTID(self, i):
return pack('!Q', i)
def makeOID(self, i):
return pack('!Q', i)
def makeUUID(self, i):
return '\0' * 12 + pack('!Q', i)
def testTransaction(self):
# test data
node = Mock({})
tid = self.makeTID(1)
oid_list = (oid1, oid2) = (self.makeOID(1), self.makeOID(2))
uuid_list = (uuid1, uuid2) = (self.makeUUID(1), self.makeUUID(2))
msg_id = 1
# create transaction object
txn = Transaction(node, tid)
self.assertEqual(txn.getUUIDList(), [])
txn.prepare(oid_list, uuid_list, msg_id)
# lock nodes one by one
def testManager(self):
# test data
node = Mock({'__hash__': 1})
tid = self.makeTID(1)
msg_id = 1
oid_list = (oid1, oid2) = self.makeOID(1), self.makeOID(2)
uuid_list = (uuid1, uuid2) = self.makeUUID(1), self.makeUUID(2)
# create transaction manager
txnman = TransactionManager()
self.assertEqual(txnman.getPendingList(), [])
self.assertRaises(KeyError, txnman.__getitem__, tid)
# begin the transaction
txnman.begin(node, tid)
self.assertEqual(len(txnman.getPendingList()), 1)
self.assertEqual(txnman.getPendingList()[0], tid)
self.assertEqual(txnman[tid].getTID(), tid)
# prepare the transaction
txnman.prepare(tid, oid_list, uuid_list, msg_id)
txn = txnman[tid]
self.assertEqual(txn.getUUIDList(), list(uuid_list))
self.assertEqual(txn.getOIDList(), list(oid_list))
# lock nodes
self.assertFalse(txnman.lock(tid, uuid1))
self.assertTrue(txnman.lock(tid, uuid2))
# transaction finished
self.assertEqual(txnman.getPendingList(), [])
def testAbortFor(self):
node1 = Mock({'__hash__': 1})
node2 = Mock({'__hash__': 2})
tid11, tid12 = self.makeTID(11), self.makeTID(12)
tid21, tid22 = self.makeTID(21), self.makeTID(22)
txnman = TransactionManager()
# register 4 transactions made by two nodes
txnman.begin(node1, tid11)
txnman.begin(node1, tid12)
txnman.begin(node2, tid21)
txnman.begin(node2, tid22)
self.assertEqual(len(txnman.getPendingList()), 4)
# abort transactions of one node
tid_list = txnman.getPendingList()
self.assertEqual(len(tid_list), 2)
self.assertTrue(tid21 in tid_list)
self.assertTrue(tid22 in tid_list)
# then the other
self.assertEqual(txnman.getPendingList(), [])
if __name__ == '__main__':
......@@ -43,6 +43,7 @@ UNIT_TEST_MODULES = [
# storage application
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment