Commit 507604be authored by Vincent Pelletier's avatar Vincent Pelletier

Add support for "packed" transaction property.

This does not add pack support, but merely prepares a place on transaction
description to tell whether it has been packed, along with corresponding
transport (packed definition, handlers, etc).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1833 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 87461c53
...@@ -82,7 +82,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -82,7 +82,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
self.app.setTransactionVoted() self.app.setTransactionVoted()
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, packed, oid_list):
# transaction information are returned as a dict # transaction information are returned as a dict
info = {} info = {}
info['time'] = TimeStamp(tid).timeTime() info['time'] = TimeStamp(tid).timeTime()
...@@ -90,6 +90,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -90,6 +90,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
info['description'] = desc info['description'] = desc
info['id'] = tid info['id'] = tid
info['oids'] = oid_list info['oids'] = oid_list
info['packed'] = packed
self.app.local_var.txn_info = info self.app.local_var.txn_info = info
def answerObjectHistory(self, conn, oid, history_list): def answerObjectHistory(self, conn, oid, history_list):
......
...@@ -262,7 +262,7 @@ class EventHandler(object): ...@@ -262,7 +262,7 @@ class EventHandler(object):
raise UnexpectedPacketError raise UnexpectedPacketError
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, packed, oid_list):
raise UnexpectedPacketError raise UnexpectedPacketError
def askObjectHistory(self, conn, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
......
...@@ -191,7 +191,7 @@ class PacketLogger(EventHandler): ...@@ -191,7 +191,7 @@ class PacketLogger(EventHandler):
pass pass
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, packed, oid_list):
pass pass
def askObjectHistory(self, conn, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
......
...@@ -201,7 +201,7 @@ class VerificationManager(BaseServiceHandler): ...@@ -201,7 +201,7 @@ class VerificationManager(BaseServiceHandler):
self._uuid_dict[uuid] = True self._uuid_dict[uuid] = True
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, packed, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
if self._uuid_dict.get(uuid, True): if self._uuid_dict.get(uuid, True):
......
...@@ -21,7 +21,7 @@ from socket import inet_ntoa, inet_aton ...@@ -21,7 +21,7 @@ from socket import inet_ntoa, inet_aton
from neo.util import Enum from neo.util import Enum
# The protocol version (major, minor). # The protocol version (major, minor).
PROTOCOL_VERSION = (4, 0) PROTOCOL_VERSION = (4, 1)
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -905,9 +905,10 @@ class AnswerTransactionInformation(Packet): ...@@ -905,9 +905,10 @@ class AnswerTransactionInformation(Packet):
""" """
Answer information (user, description) about a transaction. S -> Any. Answer information (user, description) about a transaction. S -> Any.
""" """
def _encode(self, tid, user, desc, ext, oid_list): def _encode(self, tid, user, desc, ext, packed, oid_list):
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), packed = packed and 1 or 0
len(oid_list))] body = [pack('!8sHHHBL', tid, len(user), len(desc), len(ext),
packed, len(oid_list))]
body.append(user) body.append(user)
body.append(desc) body.append(desc)
body.append(ext) body.append(ext)
...@@ -915,8 +916,9 @@ class AnswerTransactionInformation(Packet): ...@@ -915,8 +916,9 @@ class AnswerTransactionInformation(Packet):
return ''.join(body) return ''.join(body)
def _decode(self, body): def _decode(self, body):
r = unpack('!8sHHHL', body[:18]) r = unpack('!8sHHHBL', body[:18])
tid, user_len, desc_len, ext_len, oid_len = r tid, user_len, desc_len, ext_len, packed, oid_len = r
packed = bool(packed)
body = body[18:] body = body[18:]
user = body[:user_len] user = body[:user_len]
body = body[user_len:] body = body[user_len:]
...@@ -929,7 +931,7 @@ class AnswerTransactionInformation(Packet): ...@@ -929,7 +931,7 @@ class AnswerTransactionInformation(Packet):
(oid, ) = unpack('8s', body[:8]) (oid, ) = unpack('8s', body[:8])
body = body[8:] body = body[8:]
oid_list.append(oid) oid_list.append(oid)
return (tid, user, desc, ext, oid_list) return (tid, user, desc, ext, packed, oid_list)
class AskObjectHistory(Packet): class AskObjectHistory(Packet):
""" """
......
...@@ -232,7 +232,8 @@ class DatabaseManager(object): ...@@ -232,7 +232,8 @@ class DatabaseManager(object):
contains tuples, each of which consists of an object ID, contains tuples, each of which consists of an object ID,
a compression specification, a checksum and object data. a compression specification, a checksum and object data.
The transaction is either None or a tuple of the list of OIDs, The transaction is either None or a tuple of the list of OIDs,
user information, a description and extension information.""" user information, a description, extension information and transaction
pack state (True for packed)."""
raise NotImplementedError raise NotImplementedError
def finishTransaction(self, tid): def finishTransaction(self, tid):
......
...@@ -132,6 +132,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -132,6 +132,7 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "trans" stores information on committed transactions. # The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans ( q("""CREATE TABLE IF NOT EXISTS trans (
tid BIGINT UNSIGNED NOT NULL PRIMARY KEY, tid BIGINT UNSIGNED NOT NULL PRIMARY KEY,
packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL, oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL, user BLOB NOT NULL,
description BLOB NOT NULL, description BLOB NOT NULL,
...@@ -151,6 +152,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -151,6 +152,7 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "ttrans" stores information on uncommitted transactions. # The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans ( q("""CREATE TABLE IF NOT EXISTS ttrans (
tid BIGINT UNSIGNED NOT NULL, tid BIGINT UNSIGNED NOT NULL,
packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL, oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL, user BLOB NOT NULL,
description BLOB NOT NULL, description BLOB NOT NULL,
...@@ -364,13 +366,14 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -364,13 +366,14 @@ class MySQLDatabaseManager(DatabaseManager):
q("""REPLACE INTO %s VALUES (%d, %d, %d, %d, '%s')""" \ q("""REPLACE INTO %s VALUES (%d, %d, %d, %d, '%s')""" \
% (obj_table, oid, tid, compression, checksum, data)) % (obj_table, oid, tid, compression, checksum, data))
if transaction is not None: if transaction is not None:
oid_list, user, desc, ext = transaction oid_list, user, desc, ext, packed = transaction
packed = packed and 1 or 0
oids = e(''.join(oid_list)) oids = e(''.join(oid_list))
user = e(user) user = e(user)
desc = e(desc) desc = e(desc)
ext = e(ext) ext = e(ext)
q("""REPLACE INTO %s VALUES (%d, '%s', '%s', '%s', '%s')""" \ q("""REPLACE INTO %s VALUES (%d, %i, '%s', '%s', '%s', '%s')""" \
% (trans_table, tid, oids, user, desc, ext)) % (trans_table, tid, packed, oids, user, desc, ext))
except: except:
self.rollback() self.rollback()
raise raise
...@@ -412,22 +415,22 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -412,22 +415,22 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query q = self.query
tid = util.u64(tid) tid = util.u64(tid)
self.begin() self.begin()
r = q("""SELECT oids, user, description, ext FROM trans r = q("""SELECT oids, user, description, ext, packed FROM trans
WHERE tid = %d""" \ WHERE tid = %d""" \
% tid) % tid)
if not r and all: if not r and all:
r = q("""SELECT oids, user, description, ext FROM ttrans r = q("""SELECT oids, user, description, ext, packed FROM ttrans
WHERE tid = %d""" \ WHERE tid = %d""" \
% tid) % tid)
self.commit() self.commit()
if r: if r:
oids, user, desc, ext = r[0] oids, user, desc, exti, packed = r[0]
if (len(oids) % 8) != 0 or len(oids) == 0: if (len(oids) % 8) != 0 or len(oids) == 0:
raise DatabaseFailure('invalid oids for tid %x' % tid) raise DatabaseFailure('invalid oids for tid %x' % tid)
oid_list = [] oid_list = []
for i in xrange(0, len(oids), 8): for i in xrange(0, len(oids), 8):
oid_list.append(oids[i:i+8]) oid_list.append(oids[i:i+8])
return oid_list, user, desc, ext return oid_list, user, desc, ext, bool(packed)
return None return None
def getOIDList(self, offset, length, num_partitions, partition_list): def getOIDList(self, offset, length, num_partitions, partition_list):
......
...@@ -95,7 +95,7 @@ class BaseClientAndStorageOperationHandler(EventHandler): ...@@ -95,7 +95,7 @@ class BaseClientAndStorageOperationHandler(EventHandler):
p = Errors.TidNotFound('%s does not exist' % dump(tid)) p = Errors.TidNotFound('%s does not exist' % dump(tid))
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[0]) t[4], t[0])
conn.answer(p) conn.answer(p)
def askObject(self, conn, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
......
...@@ -39,7 +39,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -39,7 +39,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def askStoreTransaction(self, conn, tid, user, desc, def askStoreTransaction(self, conn, tid, user, desc,
ext, oid_list): ext, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext) self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext,
False)
conn.answer(Packets.AnswerStoreTransaction(tid)) conn.answer(Packets.AnswerStoreTransaction(tid))
def askStoreObject(self, conn, oid, serial, def askStoreObject(self, conn, oid, serial,
......
...@@ -69,13 +69,14 @@ class ReplicationHandler(EventHandler): ...@@ -69,13 +69,14 @@ class ReplicationHandler(EventHandler):
app.replicator.oid_offset = 0 app.replicator.oid_offset = 0
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, packed, oid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn: if app.replicator.current_connection is not conn:
return return
# Directly store the transaction. # Directly store the transaction.
app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), False) app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext, packed),
False)
def answerOIDs(self, conn, oid_list): def answerOIDs(self, conn, oid_list):
app = self.app app = self.app
......
...@@ -75,7 +75,7 @@ class VerificationHandler(BaseMasterHandler): ...@@ -75,7 +75,7 @@ class VerificationHandler(BaseMasterHandler):
p = Errors.TidNotFound('%s does not exist' % dump(tid)) p = Errors.TidNotFound('%s does not exist' % dump(tid))
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[0]) t[4], t[0])
conn.answer(p) conn.answer(p)
def askObjectPresent(self, conn, oid, tid): def askObjectPresent(self, conn, oid, tid):
......
...@@ -63,12 +63,12 @@ class Transaction(object): ...@@ -63,12 +63,12 @@ class Transaction(object):
def isLocked(self): def isLocked(self):
return self._locked return self._locked
def prepare(self, oid_list, user, desc, ext): def prepare(self, oid_list, user, desc, ext, packed):
""" """
Set the transaction informations Set the transaction informations
""" """
# assert self._transaction is not None # assert self._transaction is not None
self._transaction = (oid_list, user, desc, ext) self._transaction = (oid_list, user, desc, ext, packed)
def addObject(self, oid, compression, checksum, data): def addObject(self, oid, compression, checksum, data):
""" """
...@@ -160,12 +160,12 @@ class TransactionManager(object): ...@@ -160,12 +160,12 @@ class TransactionManager(object):
self._loid = self._loid_seen self._loid = self._loid_seen
self._app.dm.setLastOID(self._loid) self._app.dm.setLastOID(self._loid)
def storeTransaction(self, uuid, tid, oid_list, user, desc, ext): def storeTransaction(self, uuid, tid, oid_list, user, desc, ext, packed):
""" """
Store transaction information received from client node Store transaction information received from client node
""" """
transaction = self._getTransaction(tid, uuid) transaction = self._getTransaction(tid, uuid)
transaction.prepare(oid_list, user, desc, ext) transaction.prepare(oid_list, user, desc, ext, packed)
def storeObject(self, uuid, tid, serial, oid, compression, checksum, data): def storeObject(self, uuid, tid, serial, oid, compression, checksum, data):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment