Commit f62f9bc9 authored by Julien Muchembled's avatar Julien Muchembled

Bump protocol version

parents fa9664ee 52db5607
...@@ -22,7 +22,7 @@ from struct import Struct ...@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require # The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and # to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16). # the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 2 PROTOCOL_VERSION = 3
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION) ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data. # Avoid memory errors on corrupted data.
...@@ -122,22 +122,22 @@ def NodeStates(): ...@@ -122,22 +122,22 @@ def NodeStates():
@Enum @Enum
def CellStates(): def CellStates():
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Write-only cell. Last transactions are missing because storage is/was down # Write-only cell. Last transactions are missing because storage is/was down
# for a while, or because it is new for the partition. It usually becomes # for a while, or because it is new for the partition. It usually becomes
# UP_TO_DATE when replication is done. # UP_TO_DATE when replication is done.
OUT_OF_DATE OUT_OF_DATE
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Same as UP_TO_DATE, except that it will be discarded as soon as another # Same as UP_TO_DATE, except that it will be discarded as soon as another
# node finishes to replicate it. It means a partition is moved from 1 node # node finishes to replicate it. It means a partition is moved from 1 node
# to another. It is also discarded immediately if out-of-date. # to another. It is also discarded immediately if out-of-date.
FEEDING FEEDING
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
# A check revealed that data differs from other replicas. Cell is neither # A check revealed that data differs from other replicas. Cell is neither
# readable nor writable. # readable nor writable.
CORRUPTED CORRUPTED
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
# used for logging # used for logging
node_state_prefix_dict = { node_state_prefix_dict = {
...@@ -462,7 +462,7 @@ class PEnum(PStructItem): ...@@ -462,7 +462,7 @@ class PEnum(PStructItem):
""" """
Encapsulate an enumeration value Encapsulate an enumeration value
""" """
_fmt = '!l' _fmt = 'b'
def __init__(self, name, enum): def __init__(self, name, enum):
PStructItem.__init__(self, name) PStructItem.__init__(self, name)
...@@ -1535,16 +1535,16 @@ class Truncate(Packet): ...@@ -1535,16 +1535,16 @@ class Truncate(Packet):
_answer = Error _answer = Error
StaticRegistry = {} _next_code = 0
def register(request, ignore_when_closed=None): def register(request, ignore_when_closed=None):
""" Register a packet in the packet registry """ """ Register a packet in the packet registry """
code = len(StaticRegistry) global _next_code
code = _next_code
assert code < RESPONSE_MASK
_next_code = code + 1
if request is Error: if request is Error:
code |= RESPONSE_MASK code |= RESPONSE_MASK
# register the request # register the request
StaticRegistry[code] = request
if request is None:
return # None registered only to skip a code number (for compatibility)
request._code = code request._code = code
answer = request._answer answer = request._answer
if ignore_when_closed is None: if ignore_when_closed is None:
...@@ -1557,32 +1557,28 @@ def register(request, ignore_when_closed=None): ...@@ -1557,32 +1557,28 @@ def register(request, ignore_when_closed=None):
if answer in (Error, None): if answer in (Error, None):
return request return request
# build a class for the answer # build a class for the answer
answer = type('Answer%s' % (request.__name__, ), (Packet, ), {}) answer = type('Answer' + request.__name__, (Packet, ), {})
answer._fmt = request._answer answer._fmt = request._answer
answer.poll_thread = request.poll_thread answer.poll_thread = request.poll_thread
# compute the answer code
code = code | RESPONSE_MASK
answer._request = request answer._request = request
assert answer._code is None, "Answer of %s is already used" % (request, ) assert answer._code is None, "Answer of %s is already used" % (request, )
answer._code = code answer._code = code | RESPONSE_MASK
request._answer = answer request._answer = answer
# and register the answer packet return request, answer
assert code not in StaticRegistry, "Duplicate response packet code"
StaticRegistry[code] = answer
return (request, answer)
class Packets(dict): class Packets(dict):
""" """
Packet registry that checks packet code uniqueness and provides an index Packet registry that checks packet code uniqueness and provides an index
""" """
def __metaclass__(name, base, d): def __metaclass__(name, base, d):
# this builds a "singleton"
cls = type('PacketRegistry', base, d)()
for k, v in d.iteritems(): for k, v in d.iteritems():
if isinstance(v, type) and issubclass(v, Packet): if isinstance(v, type) and issubclass(v, Packet):
v.handler_method_name = k[0].lower() + k[1:] v.handler_method_name = k[0].lower() + k[1:]
# this builds a "singleton" cls[v._code] = v
return type('PacketRegistry', base, d)(StaticRegistry) return cls
# notifications
Error = register( Error = register(
Error) Error)
RequestIdentification, AcceptIdentification = register( RequestIdentification, AcceptIdentification = register(
......
...@@ -348,7 +348,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -348,7 +348,7 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw) super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions getLastTID deleteObject deleteTransaction dropPartitions _getLastTID
getReplicationObjectList _getTIDList nonempty""".split()) getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition) _getPartition = property(lambda self: self.db._getPartition)
...@@ -379,9 +379,9 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -379,9 +379,9 @@ class ImporterDatabaseManager(DatabaseManager):
db = self.db = buildDatabaseManager(conf['adapter'], db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait'])) (conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable query erase getPartitionTable _iterAssignedCells
getUnfinishedTIDDict dropUnfinishedData abortTransaction updateCellTID getUnfinishedTIDDict dropUnfinishedData
storeTransaction lockTransaction abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit loadData storeData getOrphanList _pruneData deferCommit
dropPartitionsTemporary dropPartitionsTemporary
""".split(): """.split():
...@@ -552,8 +552,8 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -552,8 +552,8 @@ class ImporterDatabaseManager(DatabaseManager):
return zodb, oid - zodb.shift_oid return zodb, oid - zodb.shift_oid
def getLastIDs(self): def getLastIDs(self):
tid, _, _, oid = self.db.getLastIDs() tid, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)), None, None, return (max(tid, util.p64(self.zodb_ltid)),
max(oid, util.p64(self.zodb_loid))) max(oid, util.p64(self.zodb_loid)))
def getObject(self, oid, tid=None, before_tid=None): def getObject(self, oid, tid=None, before_tid=None):
...@@ -623,7 +623,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -623,7 +623,7 @@ class ImporterDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None): def _deleteRange(self, partition, min_tid=None, max_tid=None):
# Even if everything is imported, we can't truncate below # Even if everything is imported, we can't truncate below
# because it would import again if we restart with this backend. # because it would import again if we restart with this backend.
if u64(min_tid) < self.zodb_ltid: if min_tid < self.zodb_ltid:
raise NotImplementedError raise NotImplementedError
self.db._deleteRange(partition, min_tid, max_tid) self.db._deleteRange(partition, min_tid, max_tid)
......
...@@ -21,9 +21,11 @@ from copy import copy ...@@ -21,9 +21,11 @@ from copy import copy
from functools import wraps from functools import wraps
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.interfaces import abstract, requires from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import CellStates, NonReadableCell, ZERO_TID from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID
from . import DatabaseFailure from . import DatabaseFailure
READABLE = CellStates.UP_TO_DATE, CellStates.FEEDING
def lazymethod(func): def lazymethod(func):
def getter(self): def getter(self):
cls = self.__class__ cls = self.__class__
...@@ -348,61 +350,47 @@ class DatabaseManager(object): ...@@ -348,61 +350,47 @@ class DatabaseManager(object):
except TypeError: except TypeError:
return -1 return -1
@abstract # XXX: Consider splitting getLastIDs/_getLastIDs because
def getPartitionTable(self, *nid): # sometimes the last oid is not wanted.
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
def _getAssignedPartitionList(self, *states): def _getLastTID(self, partition, max_tid=None):
nid = self.getUUID() """Return tid of last transaction <= 'max_tid' in given 'partition'
if nid is None:
return ()
if states:
return [nid for nid, state in self.getPartitionTable(nid)
if state in states]
return [x[0] for x in self.getPartitionTable(nid)]
@abstract tids are in unpacked format.
def getLastTID(self, max_tid): """
"""Return greatest tid in trans table that is <= given 'max_tid'
Required only to import a DB using Importer backend.
max_tid must be in unpacked format.
Data from unassigned partitions must be ignored. @requires(_getLastTID)
This is important because there may remain data from cells that have def getLastTID(self, max_tid=None):
been discarded, either due to --disable-drop-partitions option, """Return tid of last transaction <= 'max_tid'
or in the future when dropping partitions is done in background
(because this is an expensive operation).
XXX: Given the TODO comment in getLastIDs, getting ids tids are in unpacked format.
from readable partitions should be enough.
""" """
if self.getNumPartitions():
return max(map(self._getLastTID, self._readable_set))
def _getLastIDs(self): def _getLastIDs(self, partition):
"""Return (trans, obj, max(oid)) where """Return max(tid) & max(oid) for objects of given partition
both 'trans' and 'obj' are {partition: max(tid)}
Same as in getLastTID: data from unassigned partitions must be ignored. Results are in unpacked format
""" """
@requires(_getLastIDs) @requires(_getLastIDs)
def getLastIDs(self): def getLastIDs(self):
trans, obj, oid = self._getLastIDs() """Return max(tid) & max(oid) for readable data
if trans:
tid = max(trans.itervalues()) It is important to ignore unassigned partitions because there may
if obj: remain data from cells that have been discarded, either due to
tid = max(tid, max(obj.itervalues())) --disable-drop-partitions option, or in the future when dropping
else: partitions is done in background (as it is an expensive operation).
tid = max(obj.itervalues()) if obj else None """
# TODO: Replication can't be resumed from the tids in 'trans' and 'obj' x = self._readable_set
# because outdated cells are writable and may contain recently if x:
# committed data. We must save somewhere where replication was tid, oid = zip(*map(self._getLastIDs, x))
# interrupted and return this information. For the moment, we tid = max(self.getLastTID(None), max(tid))
# tell the replicator to resume from the beginning. oid = max(oid)
trans = obj = {} return (None if tid is None else util.p64(tid),
return tid, trans, obj, oid None if oid is None else util.p64(oid))
return None, None
def _getUnfinishedTIDDict(self): def _getUnfinishedTIDDict(self):
"""""" """"""
...@@ -514,6 +502,22 @@ class DatabaseManager(object): ...@@ -514,6 +502,22 @@ class DatabaseManager(object):
return (util.p64(serial), compression, checksum, data, return (util.p64(serial), compression, checksum, data,
None if data_serial is None else util.p64(data_serial)) None if data_serial is None else util.p64(data_serial))
def _getPartitionTable(self):
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
@requires(_getPartitionTable)
def _iterAssignedCells(self):
my_nid = self.getUUID()
return ((offset, tid) for offset, nid, tid in self._getPartitionTable()
if my_nid == nid)
@requires(_getPartitionTable)
def getPartitionTable(self):
return [(offset, nid, max(0, -state))
for offset, nid, state in self._getPartitionTable()]
@contextmanager @contextmanager
def replicated(self, offset): def replicated(self, offset):
readable_set = self._readable_set readable_set = self._readable_set
...@@ -557,16 +561,78 @@ class DatabaseManager(object): ...@@ -557,16 +561,78 @@ class DatabaseManager(object):
d.append(p << 48 if i is None else i + 1) d.append(p << 48 if i is None else i + 1)
else: else:
readable_set.clear() readable_set.clear()
readable_set.update(self._getAssignedPartitionList( readable_set.update(x[0] for x in self._iterAssignedCells()
CellStates.UP_TO_DATE, CellStates.FEEDING)) if -x[1] in READABLE)
@requires(_changePartitionTable) @requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, cell_list, reset=False): def changePartitionTable(self, ptid, cell_list, reset=False):
my_nid = self.getUUID()
pt = dict(self._iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
# incomplete.
backup_tid = self.getBackupTID()
if backup_tid:
backup_tid = util.u64(backup_tid)
def outofdate_tid(offset):
tid = pt.get(offset, 0)
if tid >= 0:
return tid
return -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cell_list = [(offset, nid, (
None if state == CellStates.DISCARDED else
-state if nid != my_nid or state != CellStates.OUT_OF_DATE else
outofdate_tid(offset)))
for offset, nid, state in cell_list]
self._changePartitionTable(cell_list, reset) self._changePartitionTable(cell_list, reset)
self._updateReadable() self._updateReadable()
assert isinstance(ptid, (int, long)), ptid assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid)) self._setConfiguration('ptid', str(ptid))
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
t, = (t for p, t in self._iterAssignedCells() if p == partition)
if t < 0:
return
tid = util.u64(tid)
# Replicator doesn't optimize when there's no new data
# since the node went down.
if t == tid:
return
# In a backup cluster, when a storage node gets down just after
# being the first to replicate fully new transactions from upstream,
# we may end up in a special situation where an OUT_OF_DATE cell
# is actually more up-to-date than an UP_TO_DATE one.
assert t < tid or self.getBackupTID()
self._changePartitionTable([(partition, self.getUUID(), tid)])
def iterCellNextTIDs(self):
p64 = util.p64
backup_tid = self.getBackupTID()
if backup_tid:
next_tid = util.u64(backup_tid)
if next_tid:
next_tid += 1
for offset, tid in self._iterAssignedCells():
if tid >= 0: # OUT_OF_DATE
yield offset, p64(tid and tid + 1)
elif -tid in READABLE:
if backup_tid:
# An UP_TO_DATE cell does not have holes so it's fine to
# resume from the last found records.
tid = self._getLastTID(offset)
yield offset, (
# For trans, a transaction can't be partially
# replicated, so replication can resume from the next
# possible tid.
p64(max(next_tid, tid + 1) if tid else next_tid),
# For obj, the last transaction may be partially
# replicated so it must be checked again (i.e. no +1).
p64(max(next_tid, self._getLastIDs(offset)[0])))
else:
yield offset, None
@abstract @abstract
def dropPartitions(self, offset_list): def dropPartitions(self, offset_list):
"""Delete all data for specified partitions""" """Delete all data for specified partitions"""
...@@ -786,9 +852,16 @@ class DatabaseManager(object): ...@@ -786,9 +852,16 @@ class DatabaseManager(object):
def truncate(self): def truncate(self):
tid = self.getTruncateTID() tid = self.getTruncateTID()
if tid: if tid:
assert tid != ZERO_TID, tid tid = util.u64(tid)
for partition in xrange(self.getNumPartitions()): assert tid, tid
cell_list = []
my_nid = self.getUUID()
for partition, state in self._iterAssignedCells():
if state > tid:
cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid) self._deleteRange(partition, tid)
if cell_list:
self._changePartitionTable(cell_list)
self._setTruncateTID(None) self._setTruncateTID(None)
self.commit() self.commit()
......
...@@ -38,7 +38,7 @@ from . import LOG_QUERIES, DatabaseFailure ...@@ -38,7 +38,7 @@ from . import LOG_QUERIES, DatabaseFailure
from .manager import DatabaseManager, splitOIDField from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
class MysqlError(DatabaseFailure): class MysqlError(DatabaseFailure):
...@@ -90,7 +90,7 @@ def auto_reconnect(wrapped): ...@@ -90,7 +90,7 @@ def auto_reconnect(wrapped):
class MySQLDatabaseManager(DatabaseManager): class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL.""" """This class manages a database on MySQL."""
VERSION = 2 VERSION = 3
ENGINES = "InnoDB", "RocksDB", "TokuDB" ENGINES = "InnoDB", "RocksDB", "TokuDB"
_engine = ENGINES[0] # default engine _engine = ENGINES[0] # default engine
...@@ -257,6 +257,14 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -257,6 +257,14 @@ class MySQLDatabaseManager(DatabaseManager):
def _migrate2(self, schema_dict): def _migrate2(self, schema_dict):
self._alterTable(schema_dict, 'obj') self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict):
self._alterTable(schema_dict, 'pt', "rid as `partition`, nid,"
" CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state"
" END as tid")
def _setup(self, dedup=False): def _setup(self, dedup=False):
self._config.clear() self._config.clear()
q = self.query q = self.query
...@@ -272,10 +280,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -272,10 +280,10 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table. # The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s ( schema_dict['pt'] = """CREATE TABLE %s (
rid INT UNSIGNED NOT NULL, `partition` SMALLINT UNSIGNED NOT NULL,
nid INT NOT NULL, nid INT NOT NULL,
state TINYINT UNSIGNED NOT NULL, tid BIGINT NOT NULL,
PRIMARY KEY (rid, nid) PRIMARY KEY (`partition`, nid)
) ENGINE=""" + engine ) ENGINE=""" + engine
if self._use_partition: if self._use_partition:
...@@ -394,36 +402,23 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -394,36 +402,23 @@ class MySQLDatabaseManager(DatabaseManager):
q("ALTER TABLE config MODIFY value VARBINARY(%s) NULL" % len(value)) q("ALTER TABLE config MODIFY value VARBINARY(%s) NULL" % len(value))
q(sql) q(sql)
def getPartitionTable(self, *nid): def _getPartitionTable(self):
if nid:
return self.query("SELECT rid, state FROM pt WHERE nid=%u" % nid)
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
def _sqlmax(self, sql, arg_list): def _getLastTID(self, partition, max_tid=None):
q = self.query x = "WHERE `partition`=%s" % partition
x = [x for x in arg_list for x, in q(sql % x) if x is not None] if max_tid:
if x: return max(x) x += " AND tid<=%s" % max_tid
(tid,), = self.query(
def getLastTID(self, max_tid): "SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)" + x)
return self._sqlmax( return tid
"SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)"
" WHERE tid<=%s and `partition`=%%s" % max_tid,
self._getAssignedPartitionList())
def _getLastIDs(self): def _getLastIDs(self, partition):
offset_list = self._getAssignedPartitionList()
p64 = util.p64
q = self.query q = self.query
sql = "SELECT MAX(tid) FROM %s WHERE `partition`=%s" x = "WHERE `partition`=%s" % partition
trans, obj = ({partition: p64(tid) (oid,), = q("SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)" + x)
for partition in offset_list (tid,), = q("SELECT MAX(tid) FROM obj FORCE INDEX (tid)" + x)
for tid, in q(sql % (t, partition)) return tid, oid
if tid is not None}
for t in ('trans FORCE INDEX (PRIMARY)', 'obj FORCE INDEX (tid)'))
oid = self._sqlmax(
"SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s", offset_list)
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition): def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s" return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
...@@ -489,17 +484,17 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -489,17 +484,17 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query q = self.query
if reset: if reset:
q("DELETE FROM pt") q("DELETE FROM pt")
for offset, nid, state in cell_list: for offset, nid, tid in cell_list:
# TODO: this logic should move out of database manager # TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query # add 'dropCells(cell_list)' to API and use one query
if state == CellStates.DISCARDED: if tid is None:
q("DELETE FROM pt WHERE rid = %d AND nid = %d" q("DELETE FROM pt WHERE `partition` = %d AND nid = %d"
% (offset, nid)) % (offset, nid))
else: else:
offset_list.append(offset) offset_list.append(offset)
q("INSERT INTO pt VALUES (%d, %d, %d)" q("INSERT INTO pt VALUES (%d, %d, %d)"
" ON DUPLICATE KEY UPDATE state = %d" " ON DUPLICATE KEY UPDATE tid = %d"
% (offset, nid, state, state)) % (offset, nid, tid, tid))
if self._use_partition: if self._use_partition:
for offset in offset_list: for offset in offset_list:
add = """ALTER TABLE %%s ADD PARTITION ( add = """ALTER TABLE %%s ADD PARTITION (
...@@ -750,10 +745,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -750,10 +745,10 @@ class MySQLDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None): def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE `partition`=%d" % partition sql = " WHERE `partition`=%d" % partition
if min_tid: if min_tid is not None:
sql += " AND %d < tid" % util.u64(min_tid) sql += " AND %d < tid" % min_tid
if max_tid: if max_tid is not None:
sql += " AND tid <= %d" % util.u64(max_tid) sql += " AND tid <= %d" % max_tid
q = self.query q = self.query
q("DELETE FROM trans" + sql) q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql sql = " FROM obj" + sql
......
...@@ -25,7 +25,7 @@ from . import LOG_QUERIES ...@@ -25,7 +25,7 @@ from . import LOG_QUERIES
from .manager import DatabaseManager, splitOIDField from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
def unique_constraint_message(table, *columns): def unique_constraint_message(table, *columns):
c = sqlite3.connect(":memory:") c = sqlite3.connect(":memory:")
...@@ -68,7 +68,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -68,7 +68,7 @@ class SQLiteDatabaseManager(DatabaseManager):
never be used for small requests. never be used for small requests.
""" """
VERSION = 2 VERSION = 3
def _parse(self, database): def _parse(self, database):
self.db = os.path.expanduser(database) self.db = os.path.expanduser(database)
...@@ -135,6 +135,12 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -135,6 +135,12 @@ class SQLiteDatabaseManager(DatabaseManager):
def _migrate2(self, schema_dict, index_dict): def _migrate2(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'obj') self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'pt', "rid, nid, CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state END")
def _setup(self, dedup=False): def _setup(self, dedup=False):
# BBB: SQLite has transactional DDL but before Python 3.6, # BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements. # the binding automatically commits between such statements.
...@@ -154,10 +160,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -154,10 +160,10 @@ class SQLiteDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table. # The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s ( schema_dict['pt'] = """CREATE TABLE %s (
rid INTEGER NOT NULL, partition INTEGER NOT NULL,
nid INTEGER NOT NULL, nid INTEGER NOT NULL,
state INTEGER NOT NULL, tid INTEGER NOT NULL,
PRIMARY KEY (rid, nid)) PRIMARY KEY (partition, nid))
""" """
# The table "trans" stores information on committed transactions. # The table "trans" stores information on committed transactions.
...@@ -254,42 +260,23 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -254,42 +260,23 @@ class SQLiteDatabaseManager(DatabaseManager):
else: else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value))) q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def getPartitionTable(self, *nid): def _getPartitionTable(self):
if nid:
return self.query("SELECT rid, state FROM pt WHERE nid=?", nid)
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
# A test with a table of 20 million lines and SQLite 3.8.7.1 shows that def _getLastTID(self, partition, max_tid=None):
# it's not worth changing getLastTID: x = self.query
# - It already returns the result in less than 2 seconds, without reading if max_tid is None:
# the whole table (this is 4-7 times faster than MySQL). x = x("SELECT MAX(tid) FROM trans WHERE partition=?", (partition,))
# - Strangely, a "GROUP BY partition" clause makes SQLite almost twice else:
# slower. x = x("SELECT MAX(tid) FROM trans WHERE partition=? AND tid<=?",
# - Getting MAX(tid) is immediate with a "AND partition=?" condition so one (partition, max_tid))
# way to speed up the following 2 methods is to repeat the queries for return x.next()[0]
# each partition (and finish in Python with max() for getLastTID).
def _getLastIDs(self, *args):
def getLastTID(self, max_tid):
return self.query(
"SELECT MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition AND tid<=?",
(self.getUUID(), max_tid,)).next()[0]
def _getLastIDs(self):
p64 = util.p64
q = self.query q = self.query
args = self.getUUID(), (oid,), = q("SELECT MAX(oid) FROM obj WHERE `partition`=?", args)
trans = {partition: p64(tid) (tid,), = q("SELECT MAX(tid) FROM obj WHERE `partition`=?", args)
for partition, tid in q( return tid, oid
"SELECT partition, MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
obj = {partition: p64(tid)
for partition, tid in q(
"SELECT partition, MAX(tid) FROM pt, obj"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
oid = q("SELECT MAX(oid) oid FROM pt, obj"
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition): def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s" return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
...@@ -357,8 +344,8 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -357,8 +344,8 @@ class SQLiteDatabaseManager(DatabaseManager):
# whereas we try to replace only 1 value ? # whereas we try to replace only 1 value ?
# We don't want to remove the 'NOT NULL' constraint # We don't want to remove the 'NOT NULL' constraint
# so we must simulate a "REPLACE OR FAIL". # so we must simulate a "REPLACE OR FAIL".
q("DELETE FROM pt WHERE rid=? AND nid=?", (offset, nid)) q("DELETE FROM pt WHERE partition=? AND nid=?", (offset, nid))
if state != CellStates.DISCARDED: if state is not None:
q("INSERT OR FAIL INTO pt VALUES (?,?,?)", q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state))) (offset, nid, int(state)))
...@@ -525,12 +512,12 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -525,12 +512,12 @@ class SQLiteDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None): def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE partition=?" sql = " WHERE partition=?"
args = [partition] args = [partition]
if min_tid: if min_tid is not None:
sql += " AND ? < tid" sql += " AND ? < tid"
args.append(util.u64(min_tid)) args.append(min_tid)
if max_tid: if max_tid is not None:
sql += " AND tid <= ?" sql += " AND tid <= ?"
args.append(util.u64(max_tid)) args.append(max_tid)
q = self.query q = self.query
q("DELETE FROM trans" + sql, args) q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql sql = " FROM obj" + sql
......
...@@ -63,7 +63,7 @@ class InitializationHandler(BaseMasterHandler): ...@@ -63,7 +63,7 @@ class InitializationHandler(BaseMasterHandler):
def askLastIDs(self, conn): def askLastIDs(self, conn):
dm = self.app.dm dm = self.app.dm
dm.truncate() dm.truncate()
ltid, _, _, loid = dm.getLastIDs() ltid, loid = dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(loid, ltid)) conn.answer(Packets.AnswerLastIDs(loid, ltid))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
...@@ -81,14 +81,6 @@ class InitializationHandler(BaseMasterHandler): ...@@ -81,14 +81,6 @@ class InitializationHandler(BaseMasterHandler):
dm.commit() dm.commit()
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
self.app.operational = True
# XXX: see comment in protocol # XXX: see comment in protocol
dm = self.app.dm self.app.operational = True
if backup: self.app.replicator.startOperation(backup)
if dm.getBackupTID():
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
...@@ -26,10 +26,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -26,10 +26,7 @@ class MasterOperationHandler(BaseMasterHandler):
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
# XXX: see comment in protocol # XXX: see comment in protocol
assert self.app.operational and backup assert self.app.operational and backup
dm = self.app.dm self.app.replicator.startOperation(backup)
if not dm.getBackupTID():
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit()
def askLockInformation(self, conn, ttid, tid): def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid) self.app.tm.lock(ttid, tid)
......
...@@ -97,15 +97,13 @@ class StorageOperationHandler(EventHandler): ...@@ -97,15 +97,13 @@ class StorageOperationHandler(EventHandler):
for serial, oid_list in object_dict.iteritems(): for serial, oid_list in object_dict.iteritems():
for oid in oid_list: for oid in oid_list:
deleteObject(oid, serial) deleteObject(oid, serial)
# XXX: It should be possible not to commit here if it was the last
# chunk, because we'll either commit again when updating
# 'backup_tid' or the partition table.
self.app.dm.commit()
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
self.app.dm.commit() # like in answerFetchTransactions
# TODO also provide feedback to master about current replication state (tid) # TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid) self.app.replicator.fetchObjects(next_tid, next_oid)
else: else:
# This will also commit.
self.app.replicator.finish() self.app.replicator.finish()
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
......
...@@ -93,7 +93,7 @@ from neo.lib import logging ...@@ -93,7 +93,7 @@ from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \ from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump from neo.lib.util import add64, dump, p64
from .handlers.storage import StorageOperationHandler from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000 FETCH_COUNT = 1000
...@@ -190,42 +190,51 @@ class Replicator(object): ...@@ -190,42 +190,51 @@ class Replicator(object):
return add64(tid, -1) return add64(tid, -1)
return ZERO_TID return ZERO_TID
def updateBackupTID(self): def updateBackupTID(self, commit=False):
dm = self.app.dm dm = self.app.dm
tid = dm.getBackupTID() tid = dm.getBackupTID()
if tid: if tid:
new_tid = self.getBackupTID() new_tid = self.getBackupTID()
if tid != new_tid: if tid != new_tid:
dm._setBackupTID(new_tid) dm._setBackupTID(new_tid)
dm.commit() if commit:
dm.commit()
def startOperation(self, backup):
dm = self.app.dm
if backup:
if dm.getBackupTID():
assert not hasattr(self, 'partition_dict'), self.partition_dict
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
try:
partition_dict = self.partition_dict
except AttributeError:
return
for offset, next_tid in dm.iterCellNextTIDs():
if type(next_tid) is not bytes: # readable
p = partition_dict[offset]
p.next_trans, p.next_obj = next_tid
def populate(self): def populate(self):
app = self.app
pt = app.pt
uuid = app.uuid
self.partition_dict = {} self.partition_dict = {}
self.replicate_dict = {} self.replicate_dict = {}
self.source_dict = {} self.source_dict = {}
self.ttid_set = set() self.ttid_set = set()
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
next_tid = app.dm.getBackupTID() or last_tid
next_tid = add64(next_tid, 1) if next_tid else ZERO_TID
outdated_list = [] outdated_list = []
for offset in xrange(pt.getPartitions()): for offset, next_tid in self.app.dm.iterCellNextTIDs():
for cell in pt.getCellList(offset): self.partition_dict[offset] = p = Partition()
if cell.getUUID() == uuid and not cell.isCorrupted(): if type(next_tid) is bytes: # OUT_OF_DATE
self.partition_dict[offset] = p = Partition() outdated_list.append(offset)
if cell.isOutOfDate(): p.next_trans = p.next_obj = next_tid
outdated_list.append(offset) p.max_ttid = INVALID_TID
try: else: # readable
p.next_trans = add64(last_trans_dict[offset], 1) p.next_trans, p.next_obj = next_tid or (None, None)
except KeyError: p.max_ttid = None
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID
else:
p.next_trans = p.next_obj = next_tid
p.max_ttid = None
if outdated_list: if outdated_list:
self.app.tm.replicating(outdated_list) self.app.tm.replicating(outdated_list)
...@@ -236,7 +245,6 @@ class Replicator(object): ...@@ -236,7 +245,6 @@ class Replicator(object):
discarded_list = [] discarded_list = []
readable_list = [] readable_list = []
app = self.app app = self.app
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
if uuid == app.uuid: if uuid == app.uuid:
if state in (CellStates.DISCARDED, CellStates.CORRUPTED): if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
...@@ -251,11 +259,9 @@ class Replicator(object): ...@@ -251,11 +259,9 @@ class Replicator(object):
elif state == CellStates.OUT_OF_DATE: elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition() self.partition_dict[offset] = p = Partition()
try: # New cell. 0 is also what should be stored by the backend.
p.next_trans = add64(last_trans_dict[offset], 1) # Nothing to optimize.
except KeyError: p.next_trans = p.next_obj = ZERO_TID
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID p.max_ttid = INVALID_TID
added_list.append(offset) added_list.append(offset)
else: else:
...@@ -289,7 +295,7 @@ class Replicator(object): ...@@ -289,7 +295,7 @@ class Replicator(object):
next_tid = add64(tid, 1) next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid p.next_trans = p.next_obj = next_tid
if next_tid: if next_tid:
self.updateBackupTID() self.updateBackupTID(True)
self._nextPartition() self._nextPartition()
def _nextPartitionSortKey(self, offset): def _nextPartitionSortKey(self, offset):
...@@ -406,11 +412,14 @@ class Replicator(object): ...@@ -406,11 +412,14 @@ class Replicator(object):
p = self.partition_dict[offset] p = self.partition_dict[offset]
p.next_obj = add64(tid, 1) p.next_obj = add64(tid, 1)
self.updateBackupTID() self.updateBackupTID()
app = self.app
app.dm.updateCellTID(offset, tid)
app.dm.commit()
if p.max_ttid or offset in self.replicate_dict and \ if p.max_ttid or offset in self.replicate_dict and \
offset not in self.source_dict: offset not in self.source_dict:
logging.debug("unfinished transactions: %r", self.ttid_set) logging.debug("unfinished transactions: %r", self.ttid_set)
else: else:
self.app.tm.replicated(offset, tid) app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r", logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node) offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay() self.getCurrentConnection().setReconnectionNoDelay()
......
...@@ -254,7 +254,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -254,7 +254,7 @@ class StorageDBTests(NeoUnitTestBase):
txn1, objs1 = self.getTransaction([oid1]) txn1, objs1 = self.getTransaction([oid1])
txn2, objs2 = self.getTransaction([oid2]) txn2, objs2 = self.getTransaction([oid2])
# nothing in database # nothing in database
self.assertEqual(self.db.getLastIDs(), (None, {}, {}, None)) self.assertEqual(self.db.getLastIDs(), (None, None))
self.assertEqual(self.db.getUnfinishedTIDDict(), {}) self.assertEqual(self.db.getUnfinishedTIDDict(), {})
self.assertEqual(self.db.getObject(oid1), None) self.assertEqual(self.db.getObject(oid1), None)
self.assertEqual(self.db.getObject(oid2), None) self.assertEqual(self.db.getObject(oid2), None)
...@@ -320,13 +320,17 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -320,13 +320,17 @@ class StorageDBTests(NeoUnitTestBase):
expected = [(t, oid_list[offset+i]) for t in tids for i in (0, np)] expected = [(t, oid_list[offset+i]) for t in tids for i in (0, np)]
self.assertEqual(self.db.getReplicationObjectList(ZERO_TID, self.assertEqual(self.db.getReplicationObjectList(ZERO_TID,
MAX_TID, len(expected) + 1, offset, ZERO_OID), expected) MAX_TID, len(expected) + 1, offset, ZERO_OID), expected)
self.db._deleteRange(0, MAX_TID) def deleteRange(partition, min_tid=None, max_tid=None):
self.db._deleteRange(0, max_tid=ZERO_TID) self.db._deleteRange(partition,
None if min_tid is None else u64(min_tid),
None if max_tid is None else u64(max_tid))
deleteRange(0, MAX_TID)
deleteRange(0, max_tid=ZERO_TID)
check(0, [], t1, t2, t3) check(0, [], t1, t2, t3)
self.db._deleteRange(0); check(0, []) deleteRange(0); check(0, [])
self.db._deleteRange(1, t2); check(1, [t1], t1, t2) deleteRange(1, t2); check(1, [t1], t1, t2)
self.db._deleteRange(2, max_tid=t2); check(2, [], t3) deleteRange(2, max_tid=t2); check(2, [], t3)
self.db._deleteRange(3, t1, t2); check(3, [t3], t1, t3) deleteRange(3, t1, t2); check(3, [t3], t1, t3)
def test_getTransaction(self): def test_getTransaction(self):
oid1, oid2 = self.getOIDs(2) oid1, oid2 = self.getOIDs(2)
......
...@@ -718,8 +718,9 @@ class Test(NEOThreadedTest): ...@@ -718,8 +718,9 @@ class Test(NEOThreadedTest):
@with_cluster() @with_cluster()
def testStorageUpgrade1(self, cluster): def testStorageUpgrade1(self, cluster):
if 1: storage = cluster.storage
storage = cluster.storage # Disable migration steps that aren't idempotent.
with Patch(storage.dm.__class__, _migrate3=lambda *_: None):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
storage.dm.setConfiguration("version", None) storage.dm.setConfiguration("version", None)
c.root()._p_changed = 1 c.root()._p_changed = 1
...@@ -1289,7 +1290,7 @@ class Test(NEOThreadedTest): ...@@ -1289,7 +1290,7 @@ class Test(NEOThreadedTest):
s1.resetNode() s1.resetNode()
with Patch(s1.dm, truncate=dieFirst(1)): with Patch(s1.dm, truncate=dieFirst(1)):
s1.start() s1.start()
self.assertEqual(s0.dm.getLastIDs()[0], truncate_tid) self.assertFalse(s0.dm.getLastIDs()[0])
self.assertEqual(s1.dm.getLastIDs()[0], r._p_serial) self.assertEqual(s1.dm.getLastIDs()[0], r._p_serial)
self.tic() self.tic()
self.assertEqual(calls, [1, 2]) self.assertEqual(calls, [1, 2])
...@@ -2351,7 +2352,7 @@ class Test(NEOThreadedTest): ...@@ -2351,7 +2352,7 @@ class Test(NEOThreadedTest):
oid, tid = big_id_list[i] oid, tid = big_id_list[i]
for j, expected in ( for j, expected in (
(1 - i, (dm.getLastTID(u64(MAX_TID)), dm.getLastIDs())), (1 - i, (dm.getLastTID(u64(MAX_TID)), dm.getLastIDs())),
(i, (u64(tid), (tid, {}, {}, oid)))): (i, (u64(tid), (tid, oid)))):
oid, tid = big_id_list[j] oid, tid = big_id_list[j]
# Somehow we abuse 'storeTransaction' because we ask it to # Somehow we abuse 'storeTransaction' because we ask it to
# write data for unassigned partitions. This is not checked # write data for unassigned partitions. This is not checked
......
...@@ -24,12 +24,12 @@ from neo.lib import logging ...@@ -24,12 +24,12 @@ from neo.lib import logging
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator from neo.storage import replicator
from neo.lib.connector import SocketConnector from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
from neo.lib.protocol import CellStates, ClusterStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64, u64 from neo.lib.util import add64, p64, u64
from .. import expectedFailure, Patch, TransactionalResource from .. import expectedFailure, Patch, TransactionalResource
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \ from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, with_cluster predictable_random, with_cluster
...@@ -39,9 +39,9 @@ from .test import PCounter, PCounterWithResolution # XXX ...@@ -39,9 +39,9 @@ from .test import PCounter, PCounterWithResolution # XXX
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped): def decorator(wrapped):
def wrapper(self): def wrapper(self):
with NEOCluster(partitions, **upstream_kw) as upstream: with NEOCluster(partitions=partitions, **upstream_kw) as upstream:
upstream.start() upstream.start()
with NEOCluster(partitions, upstream=upstream, with NEOCluster(partitions=partitions, upstream=upstream,
**backup_kw) as backup: **backup_kw) as backup:
backup.start() backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
...@@ -659,33 +659,158 @@ class ReplicationTests(NEOThreadedTest): ...@@ -659,33 +659,158 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(2, s0.sqlCount('obj')) self.assertEqual(2, s0.sqlCount('obj'))
expectedFailure(self.assertEqual)(2, count) expectedFailure(self.assertEqual)(2, count)
@with_cluster(start_cluster=0, replicas=1) @with_cluster(replicas=1)
def testResumingReplication(self, cluster): def testResumingReplication(self, cluster):
if 1: """
s0, s1 = cluster.storage_list Check from where replication resumes for an OUT_OF_DATE cell that has
cluster.start(storage_list=(s0,)) a hole, which is possible because OUT_OF_DATE cells are writable.
t, c = cluster.getTransaction() """
r = c.root() ask = []
def logReplication(conn, packet):
if isinstance(packet, (Packets.AskFetchTransactions,
Packets.AskFetchObjects)):
ask.append(packet.decode()[2:])
def getTIDList():
return [t.tid for t in c.db().storage.iterator()]
s0, s1 = cluster.storage_list
t, c = cluster.getTransaction()
r = c.root()
# s1 is UP_TO_DATE and it has the initial transaction.
# Let's outdate it: replication will have to resume just after this
# transaction, regardless of future written transactions.
# To make sure, we get a hole in the cell, we block replication.
s1.stop()
cluster.join((s1,))
r._p_changed = 1
t.commit()
s1.resetNode()
with Patch(replicator.Replicator, connected=lambda *_: None):
s1.start()
self.tic()
r._p_changed = 1 r._p_changed = 1
t.commit() t.commit()
self.tic()
s1.stop()
cluster.join((s1,))
tids = getTIDList()
s1.resetNode()
# Initialization done. Now we check that replication is correct
# and efficient.
with ConnectionFilter() as f:
f.add(logReplication)
s1.start() s1.start()
self.tic() self.tic()
with Patch(Replicator, connected=lambda *_: None): self.assertEqual([], cluster.getOutdatedCells())
cluster.enableStorageList((s1,)) s0.stop()
cluster.neoctl.tweakPartitionTable() cluster.join((s0,))
r._p_changed = 1 self.assertEqual(tids, getTIDList())
t0_next = add64(tids[0], 1)
self.assertEqual(ask, [
(t0_next, tids[2], tids[2:]),
(t0_next, tids[2], ZERO_OID, {tids[2]: [ZERO_OID]}),
])
@backup_test(2, backup_kw=dict(replicas=1))
def testResumingBackupReplication(self, backup):
upstream = backup.upstream
t, c = upstream.getTransaction()
r = c.root()
r[1] = PCounter()
t.commit()
r[2] = ob = PCounter()
tids = []
def newTransaction():
r._p_changed = ob._p_changed = 1
with upstream.moduloTID(0):
t.commit() t.commit()
self.tic()
s1.stop()
cluster.join((s1,))
t0, t1, t2 = c.db().storage.iterator()
s1.resetNode()
s1.start()
self.tic() self.tic()
self.assertEqual([], cluster.getOutdatedCells()) tids.append(r._p_serial)
s0.stop() def getTIDList(storage):
cluster.join((s0,)) return storage.dm.getReplicationTIDList(tids[0], MAX_TID, 9, 0)
t0, t1, t2 = c.db().storage.iterator() newTransaction()
self.assertEqual(u64(ob._p_oid), 2)
getBackupTid = backup.master.pt.getBackupTid
# Check when an OUT_OF_DATE cell has more data than an UP_TO_DATE one.
primary = backup.master.backup_app.primary_partition_dict[0]._uuid
slave, primary = sorted(backup.storage_list,
key=lambda x: x.uuid == primary)
with ConnectionFilter() as f:
@f.delayAnswerFetchTransactions
def delay(conn, x={None: 0, primary.uuid: 0}):
return x.pop(conn.getUUID(), 1)
newTransaction()
self.assertEqual(getBackupTid(), tids[1])
primary.stop()
backup.join((primary,))
primary.resetNode()
primary.start()
self.tic()
primary, slave = slave, primary
self.assertEqual(tids, getTIDList(slave))
self.assertEqual(tids[:1], getTIDList(primary))
self.assertEqual(getBackupTid(), add64(tids[1], -1))
self.assertEqual(f.filtered_count, 3)
self.tic()
self.assertEqual(4, self.checkBackup(backup))
self.assertEqual(getBackupTid(min), tids[1])
# Check that replication resumes from the maximum possible tid
# (for UP_TO_DATE cells of a backup cluster). More precisely:
# - cells are handled independently (done here by blocking replication
# of partition 1 to keep the backup TID low)
# - trans and obj are also handled independently (with FETCH_COUNT=1,
# we interrupt replication of obj in the middle of a transaction)
slave.stop()
backup.join((slave,))
ask = []
def delayReplicate(conn, packet):
if isinstance(packet, Packets.AskFetchObjects):
if len(ask) == 6:
return True
elif not isinstance(packet, Packets.AskFetchTransactions):
return
ask.append(packet.decode())
conn, = upstream.master.getConnectionList(backup.master)
with ConnectionFilter() as f, Patch(replicator.Replicator,
_nextPartitionSortKey=lambda orig, self, offset: offset):
f.add(delayReplicate)
delayReconnect = f.delayAskLastTransaction()
conn.close()
newTransaction()
newTransaction()
newTransaction()
self.assertFalse(ask)
self.assertEqual(f.filtered_count, 1)
with Patch(replicator, FETCH_COUNT=1):
f.remove(delayReconnect)
self.tic()
t1_next = add64(tids[1], 1)
self.assertEqual(ask, [
# trans
(0, 1, t1_next, tids[4], []),
(0, 1, tids[3], tids[4], []),
(0, 1, tids[4], tids[4], []),
# obj
(0, 1, t1_next, tids[4], ZERO_OID, {}),
(0, 1, tids[2], tids[4], p64(2), {}),
(0, 1, tids[3], tids[4], ZERO_OID, {}),
])
del ask[:]
max_ask = None
backup.stop()
newTransaction()
backup.start((primary,))
n = replicator.FETCH_COUNT
t4_next = add64(tids[4], 1)
self.assertEqual(ask, [
(0, n, t4_next, tids[5], []),
(0, n, tids[3], tids[5], ZERO_OID, {tids[3]: [ZERO_OID]}),
(1, n, t1_next, tids[5], []),
(1, n, t1_next, tids[5], ZERO_OID, {}),
])
self.tic()
self.assertEqual(2, self.checkBackup(backup))
@with_cluster(start_cluster=0, replicas=1, partitions=2) @with_cluster(start_cluster=0, replicas=1, partitions=2)
def testReplicationBlockedByUnfinished1(self, cluster, def testReplicationBlockedByUnfinished1(self, cluster,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment