Commit 6486a6cb authored by Vincent Pelletier's avatar Vincent Pelletier

Implement lastTransaction.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2478 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 0baab33c
......@@ -124,6 +124,7 @@ class ThreadContext(object):
'undo_object_tid_dict': {},
'involved_nodes': set(),
'barrier_done': False,
'last_transaction': None,
}
......@@ -1205,9 +1206,8 @@ class Application(object):
return Iterator(self, start, stop)
def lastTransaction(self):
# XXX: this doesn't consider transactions created by other clients,
# should ask the primary master
return self.local_var.tid
self._askPrimary(Packets.AskLastCommittedTID())
return self.local_var.last_transaction
def abortVersion(self, src, transaction):
if transaction is not self.local_var.txn:
......
......@@ -175,3 +175,6 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
if not status:
raise NEOStorageError('Already packing')
def answerLastTransaction(self, conn, ltid):
self.app.local_var.last_transaction = ltid
......@@ -324,7 +324,7 @@ class EventHandler(object):
def notifyReplicationDone(self, conn, offset):
raise UnexpectedPacketError
def askObjectUndoSerial(self, conn, tid, undone_tid, oid_list):
def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
raise UnexpectedPacketError
def answerObjectUndoSerial(self, conn, object_tid_dict):
......@@ -366,6 +366,12 @@ class EventHandler(object):
def notifyReady(self, conn):
raise UnexpectedPacketError
def askLastTransaction(self, conn):
raise UnexpectedPacketError
def answerLastTransaction(self, conn, tid):
raise UnexpectedPacketError
# Error packet handlers.
def error(self, conn, code, message):
......@@ -484,6 +490,8 @@ class EventHandler(object):
d[Packets.AskCheckSerialRange] = self.askCheckSerialRange
d[Packets.AnswerCheckSerialRange] = self.answerCheckSerialRange
d[Packets.NotifyReady] = self.notifyReady
d[Packets.AskLastTransaction] = self.askLastTransaction
d[Packets.AnswerLastTransaction] = self.answerLastTransaction
return d
......
......@@ -20,7 +20,7 @@ import os, sys
from time import time
from neo import protocol
from neo.protocol import UUID_NAMESPACES
from neo.protocol import UUID_NAMESPACES, ZERO_TID
from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.node import NodeManager
from neo.event import EventManager
......@@ -41,6 +41,8 @@ from neo.live_debug import register as registerLiveDebugger
class Application(object):
"""The master node application."""
packing = None
# Latest completely commited TID
last_transaction = ZERO_TID
def __init__(self, config):
......@@ -559,6 +561,14 @@ class Application(object):
neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
def getLastTransaction(self):
return self.last_transaction
def setLastTransaction(self, tid):
ltid = self.last_transaction
assert tid >= ltid, (tid, ltid)
self.last_transaction = tid
def setStorageNotReady(self, uuid):
self.storage_readiness.discard(uuid)
......
......@@ -104,3 +104,7 @@ class ClientServiceHandler(MasterHandler):
else:
conn.answer(Packets.AnswerPack(False))
def askLastTransaction(self, conn):
conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction()))
......@@ -106,6 +106,7 @@ class StorageServiceHandler(BaseServiceHandler):
# remove transaction from manager
tm.remove(tid)
app.setLastTransaction(tid)
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
......
......@@ -80,6 +80,7 @@ class RecoveryManager(MasterHandler):
node.setPending()
self.app.broadcastNodesInformation(refused_node_set)
self.app.setLastTransaction(self.app.tm.getLastTID())
neo.logging.debug('cluster starts with loid=%s and this partition ' \
'table :', dump(self.app.tm.getLastOID()))
self.app.pt.log()
......
......@@ -1695,6 +1695,24 @@ class AnswerCheckSerialRange(Packet):
# serial_checksum, max_serial
return unpack(self._header_format, body)
class AskLastTransaction(Packet):
"""
Ask last committed TID.
C -> M
"""
pass
class AnswerLastTransaction(Packet):
"""
Answer last committed TID.
M -> C
"""
def _encode(self, tid):
return tid
def _decode(self, body):
return (body, )
class NotifyReady(Packet):
"""
Notify that node is ready to serve requests.
......@@ -1970,6 +1988,11 @@ class PacketRegistry(dict):
AnswerCheckSerialRange,
)
NotifyReady = register(0x003B, NotifyReady)
AskLastTransaction, AnswerLastTransaction = register(
0x003C,
AskLastTransaction,
AnswerLastTransaction,
)
# build a "singleton"
Packets = PacketRegistry()
......
......@@ -693,6 +693,15 @@ class ProtocolTests(NeoUnitTestBase):
p = Packets.NotifyReady()
self.assertEqual(tuple(), p.decode())
def test_AskLastTransaction(self):
Packets.AskLastTransaction()
def test_AnswerLastTransaction(self):
tid = self.getNextTID()
p = Packets.AnswerLastTransaction(tid)
ptid = p.decode()[0]
self.assertEqual(ptid, tid)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment