Commit 5fe1af62 by Julien Muchembled

In (t)trans, add column that will be used to recover from some errors during tpc_finish

1 parent 7301d385
......@@ -19,13 +19,13 @@ RC = Release Critical (for next release)
RC - Review XXX in the code (CODE)
RC - Review TODO in the code (CODE)
RC - Review output of pylint (CODE)
RC - tpc_finish might raise while transaction got successfully committed.
- tpc_finish might raise while transaction got successfully committed.
This can happen if it gets disconnected from primary master while waiting
for AnswerFinishTransaction after primary received it and hence will
commit transaction independently from client presence. Client could
legitimaltely think transaction is not committed, and might decide to
retry. To solve this, a TTID column must be added in storage nodes so
client can know if his TTID got successfuly committed.
retry. To solve this, client can know if its TTID got successfuly
committed by looking at currently unused '(t)trans.ttid' column.
- Keep-alive (HIGH AVAILABILITY) (implemented, to be reviewed and tested)
Consider the need to implement a keep-alive system (packets sent
automatically when there is no activity on the connection for a period
......
......@@ -5,7 +5,7 @@ The format of MySQL tables has changed in NEO 1.0 and there is no backward
compatibility or transparent migration, so you will have to use the following
SQL commands to migrate each storage from NEO 0.10.x::
-- make sure 'tobj' is empty first
-- make sure 'tobj' & 'ttrans' are empty first
CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial;
DROP TABLE data;
RENAME TABLE new_data TO data;
......@@ -13,6 +13,9 @@ SQL commands to migrate each storage from NEO 0.10.x::
DROP TABLE obj;
RENAME TABLE new_obj TO obj;
ALTER TABLE tobj CHANGE serial tid BIGINT UNSIGNED NOT NULL, CHANGE hash data_id BIGINT UNSIGNED NULL, CHANGE value_serial value_tid BIGINT UNSIGNED NULL;
ALTER TABLE trans ADD COLUMN ttid BIGINT UNSIGNED NOT NULL;
UPDATE trans SET ttid=tid;
ALTER TABLE ttrans ADD COLUMN ttid BIGINT UNSIGNED NOT NULL;
NEO 0.10
========
......
......@@ -1419,6 +1419,7 @@ class AddTransaction(Packet):
PString('description'),
PString('extension'),
PBoolean('packed'),
PTID('ttid'),
PFOidList,
)
......
......@@ -182,6 +182,7 @@ class MySQLDatabaseManager(DatabaseManager):
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (partition, tid)
) ENGINE = InnoDB""" + p)
......@@ -215,7 +216,8 @@ class MySQLDatabaseManager(DatabaseManager):
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL
) ENGINE = InnoDB""")
# The table "tobj" stores uncommitted object metadata.
......@@ -431,17 +433,13 @@ class MySQLDatabaseManager(DatabaseManager):
q("REPLACE INTO %s VALUES (%d, %d, %d, %s, %s)" % (obj_table,
partition, oid, tid, data_id or 'NULL', value_serial))
if transaction is not None:
oid_list, user, desc, ext, packed = transaction
packed = packed and 1 or 0
oids = e(''.join(oid_list))
user = e(user)
desc = e(desc)
ext = e(ext)
if transaction:
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
q("REPLACE INTO %s VALUES (%d, %d, %i, '%s', '%s', '%s', '%s')"
% (trans_table, partition, tid, packed, oids, user, desc,
ext))
assert packed in (0, 1)
q("REPLACE INTO %s VALUES (%d,%d,%i,'%s','%s','%s','%s',%d)" % (
trans_table, partition, tid, packed, e(''.join(oid_list)),
e(user), e(desc), e(ext), u64(ttid)))
def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data)
......@@ -552,16 +550,16 @@ class MySQLDatabaseManager(DatabaseManager):
def getTransaction(self, tid, all = False):
tid = util.u64(tid)
with self as q:
r = q("SELECT oids, user, description, ext, packed FROM trans"
" WHERE partition = %d AND tid = %d"
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM trans WHERE partition = %d AND tid = %d"
% (self._getPartition(tid), tid))
if not r and all:
r = q("SELECT oids, user, description, ext, packed FROM ttrans"
" WHERE tid = %d" % tid)
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM ttrans WHERE tid = %d" % tid)
if r:
oids, user, desc, ext, packed = r[0]
oids, user, desc, ext, packed, ttid = r[0]
oid_list = splitOIDField(tid, oids)
return oid_list, user, desc, ext, bool(packed)
return oid_list, user, desc, ext, bool(packed), util.p64(ttid)
def _getObjectLength(self, oid, value_serial):
if value_serial is None:
......
......@@ -120,6 +120,7 @@ class SQLiteDatabaseManager(DatabaseManager):
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid))
""")
......@@ -155,7 +156,8 @@ class SQLiteDatabaseManager(DatabaseManager):
oids BLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL)
ext BLOB NOT NULL,
ttid INTEGER NOT NULL)
""")
# The table "tobj" stores uncommitted object metadata.
......@@ -339,13 +341,13 @@ class SQLiteDatabaseManager(DatabaseManager):
continue
raise
if transaction is not None:
oid_list, user, desc, ext, packed = transaction
if transaction:
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
assert packed in (0, 1)
q("INSERT OR FAIL INTO %strans VALUES (?,?,?,?,?,?,?)" % T,
q("INSERT OR FAIL INTO %strans VALUES (?,?,?,?,?,?,?,?)" % T,
(partition, tid, packed, buffer(''.join(oid_list)),
buffer(user), buffer(desc), buffer(ext)))
buffer(user), buffer(desc), buffer(ext), u64(ttid)))
def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data)
......@@ -459,16 +461,16 @@ class SQLiteDatabaseManager(DatabaseManager):
def getTransaction(self, tid, all=False):
tid = util.u64(tid)
with self as q:
r = q("SELECT oids, user, description, ext, packed FROM trans"
" WHERE partition=? AND tid=?",
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM trans WHERE partition=? AND tid=?",
(self._getPartition(tid), tid)).fetchone()
if not r and all:
r = q("SELECT oids, user, description, ext, packed FROM ttrans"
" WHERE tid=?", (tid,)).fetchone()
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM ttrans WHERE tid=?", (tid,)).fetchone()
if r:
oids, user, description, ext, packed = r
oids, user, description, ext, packed, ttid = r
return splitOIDField(tid, oids), str(user), \
str(description), str(ext), packed
str(description), str(ext), packed, util.p64(ttid)
def _getObjectLength(self, oid, value_serial):
if value_serial is None:
......
......@@ -84,10 +84,11 @@ class StorageOperationHandler(EventHandler):
self.app.replicator.fetchObjects()
@checkConnectionIsReplicatorConnection
def addTransaction(self, conn, tid, user, desc, ext, packed, oid_list):
def addTransaction(self, conn, tid, user, desc, ext, packed, ttid,
oid_list):
# Directly store the transaction.
self.app.dm.storeTransaction(tid, (),
(oid_list, user, desc, ext, packed), False)
(oid_list, user, desc, ext, packed, ttid), False)
@checkConnectionIsReplicatorConnection
def answerFetchObjects(self, conn, pack_tid, next_tid,
......@@ -185,9 +186,9 @@ class StorageOperationHandler(EventHandler):
conn.answer(Errors.ReplicationError(
"partition %u dropped" % partition))
return
oid_list, user, desc, ext, packed = t
oid_list, user, desc, ext, packed, ttid = t
conn.notify(Packets.AddTransaction(
tid, user, desc, ext, packed, oid_list))
tid, user, desc, ext, packed, ttid, oid_list))
yield
conn.answer(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id)
......
......@@ -95,7 +95,7 @@ class Transaction(object):
Set the transaction informations
"""
# assert self._transaction is not None
self._transaction = (oid_list, user, desc, ext, packed)
self._transaction = oid_list, user, desc, ext, packed, self._ttid
def addObject(self, oid, data_id, value_serial):
"""
......
......@@ -17,7 +17,7 @@
from binascii import a2b_hex
import unittest
from mock import Mock
from neo.lib.util import dump, p64, u64
from neo.lib.util import add64, dump, p64, u64
from neo.lib.protocol import CellStates, ZERO_HASH, ZERO_OID, ZERO_TID, MAX_TID
from .. import NeoUnitTestBase
from neo.lib.exception import DatabaseFailure
......@@ -25,6 +25,8 @@ from neo.lib.exception import DatabaseFailure
class StorageDBTests(NeoUnitTestBase):
_last_ttid = ZERO_TID
def setUp(self):
NeoUnitTestBase.setUp(self)
......@@ -141,7 +143,8 @@ class StorageDBTests(NeoUnitTestBase):
return tid_list
def getTransaction(self, oid_list):
transaction = (oid_list, 'user', 'desc', 'ext', False)
self._last_ttid = ttid = add64(self._last_ttid, 1)
transaction = oid_list, 'user', 'desc', 'ext', False, ttid
H = "0" * 20
object_list = [(oid, self.db.storeData(H, '', 1), None)
for oid in oid_list]
......@@ -329,22 +332,22 @@ class StorageDBTests(NeoUnitTestBase):
self.db.storeTransaction(tid1, objs1, txn1)
self.db.storeTransaction(tid2, objs2, txn2)
result = self.db.getTransaction(tid1, True)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, True)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
self.assertEqual(self.db.getTransaction(tid1, False), None)
self.assertEqual(self.db.getTransaction(tid2, False), None)
# commit pending transaction
self.db.finishTransaction(tid1)
self.db.finishTransaction(tid2)
result = self.db.getTransaction(tid1, True)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, True)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
result = self.db.getTransaction(tid1, False)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, False)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
def test_askFinishTransaction(self):
oid1, oid2 = self.getOIDs(2)
......@@ -355,22 +358,22 @@ class StorageDBTests(NeoUnitTestBase):
self.db.storeTransaction(tid1, objs1, txn1)
self.db.storeTransaction(tid2, objs2, txn2)
result = self.db.getTransaction(tid1, True)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, True)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
self.assertEqual(self.db.getTransaction(tid1, False), None)
self.assertEqual(self.db.getTransaction(tid2, False), None)
# stored and finished
self.db.finishTransaction(tid1)
self.db.finishTransaction(tid2)
result = self.db.getTransaction(tid1, True)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, True)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
result = self.db.getTransaction(tid1, False)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, False)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
def test_deleteTransaction(self):
oid1, oid2 = self.getOIDs(2)
......@@ -435,12 +438,12 @@ class StorageDBTests(NeoUnitTestBase):
self.db.storeTransaction(tid2, objs2, txn2)
self.db.finishTransaction(tid1)
result = self.db.getTransaction(tid1, True)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
result = self.db.getTransaction(tid2, True)
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2)))
# get from non-temporary only
result = self.db.getTransaction(tid1, False)
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1)))
self.assertEqual(self.db.getTransaction(tid2, False), None)
def test_getObjectHistory(self):
......
......@@ -57,7 +57,8 @@ class TransactionTests(NeoUnitTestBase):
oid_list = [self.getOID(1), self.getOID(2)]
txn_info = (oid_list, 'USER', 'DESC', 'EXT', False)
txn.prepare(*txn_info)
self.assertEqual(txn.getTransactionInformations(), txn_info)
self.assertEqual(txn.getTransactionInformations(),
txn_info + (txn.getTTID(),))
def testObjects(self):
txn = Transaction(self.getNewUUID(), self.getNextTID())
......@@ -140,7 +141,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self._checkTransactionStored(tid, [
(object1[0], data_id_list[0], object1[4]),
(object2[0], data_id_list[1], object2[4]),
], txn)
], txn + (ttid,))
self.manager.unlock(ttid)
self.assertFalse(ttid in self.manager)
self._checkTransactionFinished(tid)
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!