Commit 51d49fad authored by Vincent Pelletier's avatar Vincent Pelletier

Split methods listing transaction history for client and storage.

This fixes undo support in client without changing the way replication is
implemented.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1860 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e0296439
...@@ -269,6 +269,13 @@ class DatabaseManager(object): ...@@ -269,6 +269,13 @@ class DatabaseManager(object):
raise NotImplementedError raise NotImplementedError
def getTIDList(self, offset, length, num_partitions, partition_list): def getTIDList(self, offset, length, num_partitions, partition_list):
"""Return a list of TIDs in ascending order from an offset,
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
raise NotImplementedError
def getReplicationTIDList(self, offset, length, num_partitions,
partition_list):
"""Return a list of TIDs in descending order from an offset, """Return a list of TIDs in descending order from an offset,
at most the specified length. The list of partitions are passed at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs.""" to filter out non-applicable TIDs."""
......
...@@ -452,6 +452,15 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -452,6 +452,15 @@ class MySQLDatabaseManager(DatabaseManager):
return None return None
def getTIDList(self, offset, length, num_partitions, partition_list): def getTIDList(self, offset, length, num_partitions, partition_list):
q = self.query
r = q("""SELECT tid FROM trans WHERE MOD(tid, %d) in (%s)
ORDER BY tid DESC LIMIT %d,%d""" \
% (num_partitions,
','.join([str(p) for p in partition_list]),
offset, length))
return [util.p64(t[0]) for t in r]
def getReplicationTIDList(self, offset, length, num_partitions, partition_list):
q = self.query q = self.query
r = q("""SELECT tid FROM trans WHERE MOD(tid, %d) in (%s) r = q("""SELECT tid FROM trans WHERE MOD(tid, %d) in (%s)
ORDER BY tid ASC LIMIT %d,%d""" \ ORDER BY tid ASC LIMIT %d,%d""" \
......
...@@ -62,22 +62,6 @@ class BaseMasterHandler(EventHandler): ...@@ -62,22 +62,6 @@ class BaseMasterHandler(EventHandler):
class BaseClientAndStorageOperationHandler(EventHandler): class BaseClientAndStorageOperationHandler(EventHandler):
""" Accept requests common to client and storage nodes """ """ Accept requests common to client and storage nodes """
def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == protocol.INVALID_PARTITION:
partition_list = app.pt.getAssignedPartitionList(app.uuid)
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
def askObjectHistory(self, conn, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
if first >= last: if first >= last:
raise protocol.ProtocolError( 'invalid offsets') raise protocol.ProtocolError( 'invalid offsets')
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import protocol
from neo.protocol import Packets from neo.protocol import Packets
from neo.storage.handlers import BaseClientAndStorageOperationHandler from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.storage.transactions import ConflictError, DelayedError from neo.storage.transactions import ConflictError, DelayedError
...@@ -59,3 +60,19 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -59,3 +60,19 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
self.app.queueEvent(self.askStoreObject, conn, oid, serial, self.app.queueEvent(self.askStoreObject, conn, oid, serial,
compression, checksum, data, tid) compression, checksum, data, tid)
def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == protocol.INVALID_PARTITION:
partition_list = app.pt.getAssignedPartitionList(app.uuid)
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
...@@ -44,3 +44,19 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -44,3 +44,19 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
app.pt.getPartitions(), partition_list) app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerOIDs(oid_list)) conn.answer(Packets.AnswerOIDs(oid_list))
def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == protocol.INVALID_PARTITION:
partition_list = app.pt.getAssignedPartitionList(app.uuid)
else:
partition_list = [partition]
tid_list = app.dm.getReplicationTIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
...@@ -114,15 +114,15 @@ class StorageStorageHandlerTests(NeoTestBase): ...@@ -114,15 +114,15 @@ class StorageStorageHandlerTests(NeoTestBase):
conn = Mock({}) conn = Mock({})
self.checkProtocolErrorRaised(self.operation.askTIDs, conn, 1, 1, None) self.checkProtocolErrorRaised(self.operation.askTIDs, conn, 1, 1, None)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0) self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getTIDList')), 0) self.assertEquals(len(app.dm.mockGetNamedCalls('getReplicationTIDList')), 0)
def test_25_askTIDs2(self): def test_25_askTIDs2(self):
# well case => answer # well case => answer
conn = Mock({}) conn = Mock({})
self.app.dm = Mock({'getTIDList': (INVALID_TID, )}) self.app.dm = Mock({'getReplicationTIDList': (INVALID_TID, )})
self.app.pt = Mock({'getPartitions': 1}) self.app.pt = Mock({'getPartitions': 1})
self.operation.askTIDs(conn, 1, 2, 1) self.operation.askTIDs(conn, 1, 2, 1)
calls = self.app.dm.mockGetNamedCalls('getTIDList') calls = self.app.dm.mockGetNamedCalls('getReplicationTIDList')
self.assertEquals(len(calls), 1) self.assertEquals(len(calls), 1)
calls[0].checkArgs(1, 1, 1, [1, ]) calls[0].checkArgs(1, 1, 1, [1, ])
self.checkAnswerTids(conn) self.checkAnswerTids(conn)
...@@ -131,7 +131,7 @@ class StorageStorageHandlerTests(NeoTestBase): ...@@ -131,7 +131,7 @@ class StorageStorageHandlerTests(NeoTestBase):
# invalid partition => answer usable partitions # invalid partition => answer usable partitions
conn = Mock({}) conn = Mock({})
cell = Mock({'getUUID':self.app.uuid}) cell = Mock({'getUUID':self.app.uuid})
self.app.dm = Mock({'getTIDList': (INVALID_TID, )}) self.app.dm = Mock({'getReplicationTIDList': (INVALID_TID, )})
self.app.pt = Mock({ self.app.pt = Mock({
'getCellList': (cell, ), 'getCellList': (cell, ),
'getPartitions': 1, 'getPartitions': 1,
...@@ -139,7 +139,7 @@ class StorageStorageHandlerTests(NeoTestBase): ...@@ -139,7 +139,7 @@ class StorageStorageHandlerTests(NeoTestBase):
}) })
self.operation.askTIDs(conn, 1, 2, INVALID_PARTITION) self.operation.askTIDs(conn, 1, 2, INVALID_PARTITION)
self.assertEquals(len(self.app.pt.mockGetNamedCalls('getAssignedPartitionList')), 1) self.assertEquals(len(self.app.pt.mockGetNamedCalls('getAssignedPartitionList')), 1)
calls = self.app.dm.mockGetNamedCalls('getTIDList') calls = self.app.dm.mockGetNamedCalls('getReplicationTIDList')
self.assertEquals(len(calls), 1) self.assertEquals(len(calls), 1)
calls[0].checkArgs(1, 1, 1, [0, ]) calls[0].checkArgs(1, 1, 1, [0, ])
self.checkAnswerTids(conn) self.checkAnswerTids(conn)
......
...@@ -620,16 +620,16 @@ class StorageMySQSLdbTests(NeoTestBase): ...@@ -620,16 +620,16 @@ class StorageMySQSLdbTests(NeoTestBase):
False)""" % (u64(tid))) False)""" % (u64(tid)))
# get all tids for all partitions # get all tids for all partitions
result = self.db.getTIDList(0, 4, 2, (0, 1)) result = self.db.getTIDList(0, 4, 2, (0, 1))
self.assertEquals(result, [tid1, tid2, tid3, tid4]) self.assertEquals(result, [tid4, tid3, tid2, tid1])
# get all tids but from the second with a limit a two # get all tids but from the second with a limit a two
result = self.db.getTIDList(1, 2, 2, (0, 1)) result = self.db.getTIDList(1, 2, 2, (0, 1))
self.assertEquals(result, [tid2, tid3]) self.assertEquals(result, [tid3, tid2])
# get all tids for the first partition # get all tids for the first partition
result = self.db.getTIDList(0, 2, 2, (0, )) result = self.db.getTIDList(0, 2, 2, (0, ))
self.assertEquals(result, [tid1, tid3]) self.assertEquals(result, [tid3, tid1])
# get all tids for the second partition with a limit of one # get all tids for the second partition with a limit of one
result = self.db.getTIDList(0, 1, 2, (1, )) result = self.db.getTIDList(0, 1, 2, (1, ))
self.assertEquals(result, [tid2]) self.assertEquals(result, [tid4])
# get all tids for the second partition with an offset of 3 > nothing # get all tids for the second partition with an offset of 3 > nothing
result = self.db.getTIDList(3, 2, 2, (1, )) result = self.db.getTIDList(3, 2, 2, (1, ))
self.assertEquals(result, []) self.assertEquals(result, [])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment