Commit 6aa372d9 authored by Julien Muchembled's avatar Julien Muchembled

client: many fixes to 'transactionLog'

- Do not fetch data from outdated/discarded cells.
- Do not return more than transactions than requested by 'limit' parameter.
  Anyway, all results above this 'limit' could contain holes.
parent d9adfcbc
......@@ -19,6 +19,7 @@ from cPickle import dumps, loads
from zlib import compress as real_compress, decompress
from neo.lib.locking import Empty
from random import shuffle
import heapq
import time
import os
......@@ -946,30 +947,28 @@ class Application(object):
return undo_info
def transactionLog(self, start, stop, limit):
node_map = self.pt.getNodeMap()
node_list = node_map.keys()
node_list.sort(key=self.cp.getCellSortKey)
partition_set = set(range(self.pt.getPartitions()))
queue = self._getThreadQueue()
tid_set = set()
tid_list = []
# request a tid list for each partition
for node in node_list:
conn = self.cp.getConnForNode(node)
request_set = set(node_map[node]) & partition_set
if conn is None or not request_set:
continue
partition_set -= set(request_set)
packet = Packets.AskTIDsFrom(start, stop, limit, request_set)
conn.ask(packet, queue=queue, tid_set=tid_set)
if not partition_set:
for offset in xrange(self.pt.getPartitions()):
p = Packets.AskTIDsFrom(start, stop, limit, [offset])
for node, conn in self.cp.iterateForObject(offset, readable=True):
try:
r = self._askStorage(conn, p)
break
assert not partition_set
self.waitResponses(queue)
except ConnectionClosed:
pass
else:
raise NEOStorageError('transactionLog failed')
if r:
tid_list = list(heapq.merge(tid_list, r))
if len(tid_list) >= limit:
del tid_list[limit:]
stop = tid_list[-1]
# request transactions informations
txn_list = []
append = txn_list.append
tid = None
for tid in sorted(tid_set):
for tid in tid_list:
(txn_info, txn_ext) = self._getTransactionInformation(tid)
txn_info['ext'] = loads(txn_ext)
append(txn_info)
......
......@@ -100,10 +100,9 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerStoreTransaction(self, conn, _):
pass
def answerTIDsFrom(self, conn, tid_list, tid_set):
def answerTIDsFrom(self, conn, tid_list):
neo.lib.logging.debug('Get %d TIDs from %r', len(tid_list), conn)
assert not tid_set.intersection(tid_list)
tid_set.update(tid_list)
self.app.setHandlerData(tid_list)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
......
......@@ -126,7 +126,9 @@ class ConnectionPool(object):
def iterateForObject(self, object_id, readable=False, writable=False):
""" Iterate over nodes managing an object """
pt = self.app.getPartitionTable()
cell_list = pt.getCellListForOID(object_id, readable, writable)
if type(object_id) is str:
object_id = pt.getPartition(object_id)
cell_list = pt.getCellList(object_id, readable, writable)
if not cell_list:
raise NEOStorageError('no storage available')
getConnForNode = self.getConnForNode
......
......@@ -140,12 +140,6 @@ class PartitionTable(object):
except (TypeError, KeyError):
return []
def getCellListForTID(self, tid, readable=False, writable=False):
return self.getCellList(self.getPartition(tid), readable, writable)
def getCellListForOID(self, oid, readable=False, writable=False):
return self.getCellList(self.getPartition(oid), readable, writable)
def getPartition(self, oid_or_tid):
return u64(oid_or_tid) % self.getPartitions()
......@@ -326,14 +320,6 @@ class PartitionTable(object):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
def getNodeMap(self):
""" Return a list of 2-tuple: (uuid, partition_list) """
uuid_map = {}
for index, row in enumerate(self.partition_list):
for cell in row:
uuid_map.setdefault(cell.getNode(), []).append(index)
return uuid_map
def thread_safe(method):
def wrapper(self, *args, **kwargs):
self.lock()
......@@ -358,14 +344,6 @@ class MTPartitionTable(PartitionTable):
def unlock(self):
self._lock.release()
@thread_safe
def getCellListForTID(self, *args, **kwargs):
return PartitionTable.getCellListForTID(self, *args, **kwargs)
@thread_safe
def getCellListForOID(self, *args, **kwargs):
return PartitionTable.getCellListForOID(self, *args, **kwargs)
@thread_safe
def setCell(self, *args, **kwargs):
return PartitionTable.setCell(self, *args, **kwargs)
......@@ -381,8 +359,3 @@ class MTPartitionTable(PartitionTable):
@thread_safe
def getNodeList(self, *args, **kwargs):
return PartitionTable.getNodeList(self, *args, **kwargs)
@thread_safe
def getNodeMap(self, *args, **kwargs):
return PartitionTable.getNodeMap(self, *args, **kwargs)
......@@ -41,10 +41,7 @@ def _getMasterConnection(self):
self.uuid = 'C' * 16
self.num_partitions = 10
self.num_replicas = 1
self.pt = Mock({
'getCellListForOID': (),
'getCellListForTID': (),
})
self.pt = Mock({'getCellList': ()})
self.master_conn = Mock()
return self.master_conn
......@@ -293,13 +290,12 @@ class ClientApplicationTests(NeoUnitTestBase):
None, txn)
# check partition_id and an empty cell list -> NEOStorageError
self._begin(app, txn, self.makeTID())
app.pt = Mock({ 'getCellListForOID': (), })
app.pt = Mock({'getCellList': ()})
app.num_partitions = 2
self.assertRaises(NEOStorageError, app.store, oid, tid, '', None,
txn)
calls = app.pt.mockGetNamedCalls('getCellListForOID')
calls = app.pt.mockGetNamedCalls('getCellList')
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0].getParam(0), oid) # oid=11
def test_store2(self):
app = self.getApp()
......@@ -312,7 +308,7 @@ class ClientApplicationTests(NeoUnitTestBase):
packet.setId(0)
storage_address = ('127.0.0.1', 10020)
node, cell, conn = self.getNodeCellConn(address=storage_address)
app.pt = Mock({ 'getCellListForOID': (cell, cell)})
app.pt = Mock()
app.cp = self.getConnectionPool([(node, conn)])
app.dispatcher = Dispatcher()
app.nm.createStorage(address=storage_address)
......@@ -341,7 +337,7 @@ class ClientApplicationTests(NeoUnitTestBase):
node, cell, conn = self.getNodeCellConn(address=storage_address,
uuid=uuid)
app.cp = self.getConnectionPool([(node, conn)])
app.pt = Mock({ 'getCellListForOID': (cell, cell, ) })
app.pt = Mock()
app.dispatcher = Dispatcher()
app.nm.createStorage(address=storage_address)
app.store(oid, tid, 'DATA', None, txn)
......@@ -391,7 +387,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.master_conn = Mock()
conn = Mock()
cell = Mock()
app.pt = Mock({'getCellListForTID': (cell, cell)})
app.cp = Mock({'getConnForCell': ReturnValues(None, cell)})
app.tpc_abort(txn)
# no packet sent
......@@ -456,14 +451,7 @@ class ClientApplicationTests(NeoUnitTestBase):
node1 = Mock({'__repr__': 'node1', '__hash__': 1, 'getConnection': conn1})
node2 = Mock({'__repr__': 'node2', '__hash__': 2, 'getConnection': conn2})
node3 = Mock({'__repr__': 'node3', '__hash__': 3, 'getConnection': conn3})
cell1 = Mock({ 'getNode': node1, '__hash__': 1, 'getConnection': conn1})
cell2 = Mock({ 'getNode': node2, '__hash__': 2, 'getConnection': conn2})
cell3 = Mock({ 'getNode': node3, '__hash__': 3, 'getConnection': conn3})
# fake environment
app.pt = Mock({
'getCellListForTID': [cell1],
'getCellListForOID': ReturnValues([cell2], [cell3]),
})
app.cp = Mock({'getConnForCell': ReturnValues(conn2, conn3, conn1)})
app.cp = Mock({
'getConnForNode': ReturnValues(conn2, conn3, conn1),
......@@ -544,11 +532,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'getAddress': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({
'getCellListForTID': [cell, ],
'getCellListForOID': [cell, ],
'getCellList': [cell, ],
})
app.pt = Mock({'getCellList': [cell]})
transaction_info = Packets.AnswerTransactionInformation(tid1, '', '',
'', False, (oid0, ))
transaction_info.setId(1)
......@@ -752,7 +736,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.dispatcher = Dispatcher()
app.pt = Mock({
'getNodeList': (Mock(), Mock()),
'getCellListForTID': ReturnValues([Mock()], [Mock()]),
})
app.cp = Mock({
'getConnForNode': ReturnValues(answerTIDs(p1), answerTIDs(p2)),
......
......@@ -72,7 +72,7 @@ class ConnectionPoolTests(NeoUnitTestBase):
def test_iterateForObject_noStorageAvailable(self):
# no node available
oid = self.getOID(1)
pt = Mock({'getCellListForOID': []})
pt = Mock({'getCellList': []})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
self.assertRaises(NEOStorageError, pool.iterateForObject(oid).next)
......@@ -83,7 +83,7 @@ class ConnectionPoolTests(NeoUnitTestBase):
node = Mock({'__repr__': 'node', 'isRunning': True})
cell = Mock({'__repr__': 'cell', 'getNode': node})
conn = Mock({'__repr__': 'conn'})
pt = Mock({'getCellListForOID': [cell]})
pt = Mock({'getCellList': [cell]})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
pool.getConnForNode = Mock({'__call__': ReturnValues(None, conn)})
......@@ -95,7 +95,7 @@ class ConnectionPoolTests(NeoUnitTestBase):
node = Mock({'__repr__': 'node', 'isRunning': True})
cell = Mock({'__repr__': 'cell', 'getNode': node})
conn = Mock({'__repr__': 'conn'})
pt = Mock({'getCellListForOID': [cell]})
pt = Mock({'getCellList': [cell]})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
pool.getConnForNode = Mock({'__call__': conn})
......
......@@ -421,35 +421,6 @@ class PartitionTableTests(NeoUnitTestBase):
# unknwon row
self.assertRaises(IndexError, pt.getRow, 5)
def test_getNodeMap(self):
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID()
sn1 = StorageNode(Mock(),("127.0.0.1", 19001) , uuid1)
pt.setCell(0, sn1, CellStates.UP_TO_DATE)
pt.setCell(1, sn1, CellStates.UP_TO_DATE)
pt.setCell(2, sn1, CellStates.UP_TO_DATE)
self.assertEqual(pt.getNodeMap(), {
sn1: [0, 1, 2],
})
sn2 = StorageNode(Mock(), ("127.0.0.2", 19001), uuid2)
pt.setCell(0, sn2, CellStates.UP_TO_DATE)
pt.setCell(1, sn2, CellStates.UP_TO_DATE)
self.assertEqual(pt.getNodeMap(), {
sn1: [0, 1, 2],
sn2: [0, 1],
})
sn3 = StorageNode(Mock(), ("127.0.0.3", 19001), uuid3)
pt.setCell(0, sn3, CellStates.UP_TO_DATE)
self.assertEqual(pt.getNodeMap(), {
sn1: [0, 1, 2],
sn2: [0, 1],
sn3: [0],
})
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment