Commit ad81d3fd authored by Julien Muchembled's avatar Julien Muchembled

Some code cleanup

parent 20964f4f
...@@ -330,7 +330,6 @@ class Application(object): ...@@ -330,7 +330,6 @@ class Application(object):
neo.lib.logging.error('Connection to %s lost', neo.lib.logging.error('Connection to %s lost',
self.trying_master_node) self.trying_master_node)
self.primary_master_node = None self.primary_master_node = None
continue
neo.lib.logging.info("Connected and ready") neo.lib.logging.info("Connected and ready")
return conn return conn
......
...@@ -143,13 +143,12 @@ class EpollEventManager(object): ...@@ -143,13 +143,12 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
pass continue
else: conn.lock()
conn.lock() try:
try: conn.writable()
conn.writable() finally:
finally: conn.unlock()
conn.unlock()
for fd in elist: for fd in elist:
# This can fail, if a connection is closed in previous calls to # This can fail, if a connection is closed in previous calls to
...@@ -157,13 +156,12 @@ class EpollEventManager(object): ...@@ -157,13 +156,12 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
pass continue
else: conn.lock()
conn.lock() try:
try: conn.readable()
conn.readable() finally:
finally: conn.unlock()
conn.unlock()
if conn.hasPendingMessages(): if conn.hasPendingMessages():
self._addPendingConnection(conn) self._addPendingConnection(conn)
......
...@@ -55,7 +55,7 @@ class Application(object): ...@@ -55,7 +55,7 @@ class Application(object):
self.storage_readiness = set() self.storage_readiness = set()
master_addresses, connector_name = config.getMasters() master_addresses, connector_name = config.getMasters()
self.connector_handler = getConnectorHandler(connector_name) self.connector_handler = getConnectorHandler(connector_name)
for master_address in master_addresses : for master_address in master_addresses:
self.nm.createMaster(address=master_address) self.nm.createMaster(address=master_address)
neo.lib.logging.debug('IP address is %s, port is %d', *(self.server)) neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
......
...@@ -140,7 +140,6 @@ class PartitionTable(PartitionTable): ...@@ -140,7 +140,6 @@ class PartitionTable(PartitionTable):
# update the partition table # update the partition table
cell_list = [self.setCell(offset, node, CellStates.UP_TO_DATE)] cell_list = [self.setCell(offset, node, CellStates.UP_TO_DATE)]
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
# If the partition contains a feeding cell, drop it now. # If the partition contains a feeding cell, drop it now.
for feeding_cell in self.getCellList(offset): for feeding_cell in self.getCellList(offset):
......
...@@ -357,7 +357,7 @@ class TransactionManager(object): ...@@ -357,7 +357,7 @@ class TransactionManager(object):
# will be finished # will be finished
for txn in self._ttid_dict.itervalues(): for txn in self._ttid_dict.itervalues():
txn.registerForNotification(uuid) txn.registerForNotification(uuid)
return set(self._ttid_dict) return self._ttid_dict.keys()
def begin(self, node, tid=None): def begin(self, node, tid=None):
""" """
......
...@@ -70,7 +70,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -70,7 +70,7 @@ class testTransactionManager(NeoUnitTestBase):
callback = Mock() callback = Mock()
txnman = TransactionManager(on_commit=callback) txnman = TransactionManager(on_commit=callback)
self.assertFalse(txnman.hasPending()) self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.registerForNotification(uuid1), set()) self.assertEqual(txnman.registerForNotification(uuid1), [])
# begin the transaction # begin the transaction
ttid = txnman.begin(node) ttid = txnman.begin(node)
self.assertTrue(ttid is not None) self.assertTrue(ttid is not None)
...@@ -79,7 +79,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -79,7 +79,7 @@ class testTransactionManager(NeoUnitTestBase):
# prepare the transaction # prepare the transaction
tid = txnman.prepare(ttid, 1, oid_list, uuid_list, msg_id) tid = txnman.prepare(ttid, 1, oid_list, uuid_list, msg_id)
self.assertTrue(txnman.hasPending()) self.assertTrue(txnman.hasPending())
self.assertEqual(txnman.registerForNotification(uuid1), set([ttid])) self.assertEqual(txnman.registerForNotification(uuid1), [ttid])
txn = txnman[ttid] txn = txnman[ttid]
self.assertEqual(txn.getTID(), tid) self.assertEqual(txn.getTID(), tid)
self.assertEqual(txn.getUUIDList(), list(uuid_list)) self.assertEqual(txn.getUUIDList(), list(uuid_list))
...@@ -91,7 +91,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -91,7 +91,7 @@ class testTransactionManager(NeoUnitTestBase):
self.assertEqual(len(callback.getNamedCalls('__call__')), 1) self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished # transaction finished
txnman.remove(client_uuid, ttid) txnman.remove(client_uuid, ttid)
self.assertEqual(txnman.registerForNotification(uuid1), set()) self.assertEqual(txnman.registerForNotification(uuid1), [])
def testAbortFor(self): def testAbortFor(self):
oid_list = [self.makeOID(1), ] oid_list = [self.makeOID(1), ]
...@@ -100,18 +100,18 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -100,18 +100,18 @@ class testTransactionManager(NeoUnitTestBase):
client_uuid, client = self.makeNode(3) client_uuid, client = self.makeNode(3)
txnman = TransactionManager(lambda tid, txn: None) txnman = TransactionManager(lambda tid, txn: None)
# register 4 transactions made by two nodes # register 4 transactions made by two nodes
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set()) self.assertEqual(txnman.registerForNotification(storage_1_uuid), [])
ttid1 = txnman.begin(client) ttid1 = txnman.begin(client)
tid1 = txnman.prepare(ttid1, 1, oid_list, [storage_1_uuid], 1) tid1 = txnman.prepare(ttid1, 1, oid_list, [storage_1_uuid], 1)
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1])) self.assertEqual(txnman.registerForNotification(storage_1_uuid), [ttid1])
# abort transactions of another node, transaction stays # abort transactions of another node, transaction stays
txnman.abortFor(node2) txnman.abortFor(node2)
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1])) self.assertEqual(txnman.registerForNotification(storage_1_uuid), [ttid1])
# abort transactions of requesting node, transaction is not removed # abort transactions of requesting node, transaction is not removed
# because the transaction is prepared and must remains until the end of # because the transaction is prepared and must remains until the end of
# the 2PC # the 2PC
txnman.abortFor(node1) txnman.abortFor(node1)
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1])) self.assertEqual(txnman.registerForNotification(storage_1_uuid), [ttid1])
self.assertTrue(txnman.hasPending()) self.assertTrue(txnman.hasPending())
# ...and the lock is available # ...and the lock is available
txnman.begin(client, self.getNextTID()) txnman.begin(client, self.getNextTID())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment