Commit 624c40b7 authored by Grégory Wisniewski's avatar Grégory Wisniewski

The master's transaction manager handle the last TID.

Instead of let the ltid in the master app module, the transaction manager keep it and compute the next each time a transaction begins.
The getNextTID() method has moved from util to transactions module.
Last TID collected during recovery is also handled by the transaction manager through setLastTID() method.
Any reference to app.ltid are updated,

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1455 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8ff0bc01
......@@ -82,8 +82,6 @@ class Application(object):
# The last OID.
self.loid = None
# The last TID.
self.ltid = None
# The target node's uuid to request next.
self.target_uuid = None
......@@ -370,7 +368,6 @@ class Application(object):
self.broadcastNodesInformation(node_list)
# resert IDs generators
self.loid = '\0' * 8
self.ltid = '\0' * 8
# build the partition with this node
pt.setID(pack('!Q', 1))
pt.make(node_list)
......@@ -388,7 +385,6 @@ class Application(object):
em = self.em
self.loid = None
self.ltid = None
self.pt.setID(None)
self.target_uuid = None
......
......@@ -20,7 +20,7 @@ from neo import logging
from neo import protocol
from neo.protocol import NodeStates, Packets, UnexpectedPacketError
from neo.master.handlers import BaseServiceHandler
from neo.util import dump, getNextTID
from neo.util import dump
class ClientServiceHandler(BaseServiceHandler):
......@@ -41,17 +41,8 @@ class ClientServiceHandler(BaseServiceHandler):
logging.warn('aborting transaction %s does not exist', dump(tid))
def askBeginTransaction(self, conn, packet, tid):
app = self.app
if tid is not None and tid < app.ltid:
# supplied TID is in the past
raise protocol.ProtocolError('invalid TID requested')
if tid is None:
# give a new transaction ID
tid = getNextTID(app.ltid)
# TODO: transaction manager should handle last TID
app.ltid = tid
node = app.nm.getByUUID(conn.getUUID())
app.tm.begin(node, tid)
node = self.app.nm.getByUUID(conn.getUUID())
tid = self.app.tm.begin(node, tid)
conn.answer(Packets.AnswerBeginTransaction(tid), packet.getId())
def askNewOIDs(self, conn, packet, num_oids):
......@@ -62,7 +53,7 @@ class ClientServiceHandler(BaseServiceHandler):
app = self.app
# If the given transaction ID is later than the last TID, the peer
# is crazy.
if app.ltid < tid:
if tid > self.app.tm.getLastTID():
raise UnexpectedPacketError
# Collect partitions related to this transaction.
......
......@@ -34,7 +34,7 @@ class RecoveryHandler(MasterHandler):
# Get max values.
app.loid = max(loid, app.loid)
app.tid = max(ltid, app.ltid)
self.app.tm.setLastTID(ltid)
if lptid > pt.getID():
# something newer
app.target_uuid = conn.getUUID()
......
......@@ -44,8 +44,8 @@ class StorageServiceHandler(BaseServiceHandler):
def askLastIDs(self, conn, packet):
app = self.app
conn.answer(Packets.AnswerLastIDs(app.loid, app.ltid, app.pt.getID()),
packet.getId())
conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(),
app.pt.getID()), packet.getId())
def askUnfinishedTransactions(self, conn, packet):
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
......@@ -58,7 +58,7 @@ class StorageServiceHandler(BaseServiceHandler):
# If the given transaction ID is later than the last TID, the peer
# is crazy.
if app.ltid < tid:
if tid > self.app.tm.getLastTID():
raise UnexpectedPacketError
try:
......
......@@ -34,7 +34,8 @@ class VerificationHandler(BaseServiceHandler):
def answerLastIDs(self, conn, packet, loid, ltid, lptid):
app = self.app
# If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.pt.getID() < lptid:
if app.loid < loid or ltid > app.tm.getLastTID() \
or app.pt.getID() < lptid:
logging.critical('got later information in verification')
raise VerificationFailure
......
......@@ -16,6 +16,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from time import time, gmtime
from struct import pack, unpack
class Transaction(object):
"""
......@@ -96,6 +98,7 @@ class TransactionManager(object):
self._tid_dict = {}
# node -> transactions mapping
self._node_dict = {}
self._last_tid = None
def __getitem__(self, tid):
"""
......@@ -109,9 +112,47 @@ class TransactionManager(object):
"""
return tid in self._tid_dict
def _nextTID(self):
""" Compute the next TID based on the current time and check collisions """
tm = time()
gmt = gmtime(tm)
upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \
+ gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min
lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
tid = pack('!LL', upper, lower)
if self._last_tid is not None and tid <= self._last_tid:
upper, lower = unpack('!LL', self._last_tid)
if lower == 0xffffffff:
# This should not happen usually.
from datetime import timedelta, datetime
d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday,
gmt.tm_hour, gmt.tm_min) \
+ timedelta(0, 60)
upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \
+ d.day - 1) * 24 + d.hour) * 60 + d.minute
lower = 0
else:
lower += 1
tid = pack('!LL', upper, lower)
self._last_tid = tid
return self._last_tid
def getLastTID(self):
"""
Returns the last TID used
"""
return self._last_tid
def setLastTID(self, tid):
"""
Set the last TID, keep the previous if lower
"""
self._last_tid = max(self._last_tid, tid)
def reset(self):
"""
Discard all manager content
This doesn't reset the last TID.
"""
self._tid_dict = {}
self._node_dict = {}
......@@ -128,16 +169,22 @@ class TransactionManager(object):
"""
return self._tid_dict.keys()
# TODO: manager should generate the tid itself
def begin(self, node, tid):
"""
Begin a new transaction
"""
assert node is not None
if tid is not None and tid < self._last_tid:
# supplied TID is in the past
raise protocol.ProtocolError('Invalid TID requested')
if tid is None:
# give a TID
tid = self._nextTID()
txn = Transaction(node, tid)
self._tid_dict[tid] = txn
# XXX: check the data structure
self._node_dict.setdefault(node, {})[tid] = txn
return tid
def prepare(self, tid, oid_list, uuid_list, msg_id):
"""
......
......@@ -19,7 +19,6 @@
import re
from zlib import adler32
from struct import pack, unpack
from time import time, gmtime
def u64(s):
return unpack('!Q', s)[0]
......@@ -55,30 +54,6 @@ def makeChecksum(s):
"""Return a 4-byte integer checksum against a string."""
return adler32(s) & 0xffffffff
def getNextTID(ltid):
""" Compute the next TID based on the current time and check collisions """
tm = time()
gmt = gmtime(tm)
upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \
+ gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min
lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
tid = pack('!LL', upper, lower)
if tid <= ltid:
upper, lower = unpack('!LL', ltid)
if lower == 0xffffffff:
# This should not happen usually.
from datetime import timedelta, datetime
d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday,
gmt.tm_hour, gmt.tm_min) \
+ timedelta(0, 60)
upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \
+ d.day - 1) * 24 + d.hour) * 60 + d.minute
lower = 0
else:
lower += 1
tid = pack('!LL', upper, lower)
return tid
def parseMasterList(masters, except_node=None):
if not masters:
return []
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment