Commit 34d797e2 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix commit activity when cells are discarded or when they become readable

This is a follow up of commit 64afd7d2,
which focused on read accesses when there is no transaction activity.

This commit also includes a test to check a simpler scenario that the one
described in the previous commit.
parent 6a75a654
...@@ -49,13 +49,6 @@ ...@@ -49,13 +49,6 @@
committed by future transactions. committed by future transactions.
- Add a 'devid' storage configuration so that master do not distribute - Add a 'devid' storage configuration so that master do not distribute
replicated partitions on storages with same 'devid'. replicated partitions on storages with same 'devid'.
- Fix race conditions between client-to-storage operations and moved
partitions. Currently, reads succeed because feeding nodes don't delete
anything while the cluster is operational, for performance reasons:
deletion of dropped partitions must be reimplemented in a scalable way.
The same thing happens for writes: storage nodes must discard
stores/checks of dropped partitions (in lockObject, that can be done by
raising ConflictError(None)). (HIGH AVAILABILITY)
Storage Storage
- Use libmysqld instead of a stand-alone MySQL server. - Use libmysqld instead of a stand-alone MySQL server.
......
...@@ -305,6 +305,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -305,6 +305,7 @@ class ImporterDatabaseManager(DatabaseManager):
getUnfinishedTIDDict dropUnfinishedData abortTransaction getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction storeTransaction lockTransaction unlockTransaction
loadData storeData getOrphanList _pruneData deferCommit loadData storeData getOrphanList _pruneData deferCommit
dropPartitionsTemporary
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
......
...@@ -257,15 +257,6 @@ class DatabaseManager(object): ...@@ -257,15 +257,6 @@ class DatabaseManager(object):
if ptid is not None: if ptid is not None:
return int(ptid) return int(ptid)
def setPTID(self, ptid):
"""
Store a Partition Table ID into a database.
"""
if ptid is not None:
assert isinstance(ptid, (int, long)), ptid
ptid = str(ptid)
self.setConfiguration('ptid', ptid)
def getBackupTID(self): def getBackupTID(self):
return util.bin(self.getConfiguration('backup_tid')) return util.bin(self.getConfiguration('backup_tid'))
...@@ -442,15 +433,27 @@ class DatabaseManager(object): ...@@ -442,15 +433,27 @@ class DatabaseManager(object):
else: else:
readable_set.add(offset) readable_set.add(offset)
self._changePartitionTable(cell_list, reset) self._changePartitionTable(cell_list, reset)
self.setPTID(ptid) assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
@abstract @abstract
def dropPartitions(self, offset_list): def dropPartitions(self, offset_list):
"""Delete all data for specified partitions""" """Delete all data for specified partitions"""
@abstract def _getUnfinishedDataIdList(self):
"""Drop any unfinished data from a database."""
@requires(_getUnfinishedDataIdList)
def dropUnfinishedData(self): def dropUnfinishedData(self):
"""Drop any unfinished data from a database.""" """Drop any unfinished data from a database."""
data_id_list = self._getUnfinishedDataIdList()
self.dropPartitionsTemporary()
self.releaseData(data_id_list, True)
self.commit()
@abstract
def dropPartitionsTemporary(self, offset_list=None):
"""Drop partitions from temporary tables"""
@abstract @abstract
def storeTransaction(self, tid, object_list, transaction, temporary = True): def storeTransaction(self, tid, object_list, transaction, temporary = True):
...@@ -531,8 +534,7 @@ class DatabaseManager(object): ...@@ -531,8 +534,7 @@ class DatabaseManager(object):
else: else:
del refcount[data_id] del refcount[data_id]
if prune: if prune:
self._pruneData(data_id_list) return self._pruneData(data_id_list)
self.commit()
@fallback @fallback
def _getDataTID(self, oid, tid=None, before_tid=None): def _getDataTID(self, oid, tid=None, before_tid=None):
......
...@@ -426,12 +426,15 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -426,12 +426,15 @@ class MySQLDatabaseManager(DatabaseManager):
if e.args[0] != DROP_LAST_PARTITION: if e.args[0] != DROP_LAST_PARTITION:
raise raise
def dropUnfinishedData(self): def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
" WHERE `partition` IN (%s)" % ','.join(map(str, offset_list))
q = self.query q = self.query
data_id_list = [x for x, in q("SELECT data_id FROM tobj") if x] q("DELETE FROM tobj" + where)
q("DELETE FROM tobj") q("DELETE FROM ttrans" + where)
q("DELETE FROM ttrans")
self.releaseData(data_id_list, True)
def storeTransaction(self, tid, object_list, transaction, temporary = True): def storeTransaction(self, tid, object_list, transaction, temporary = True):
e = self.escape e = self.escape
......
...@@ -329,12 +329,15 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -329,12 +329,15 @@ class SQLiteDatabaseManager(DatabaseManager):
q("DELETE FROM trans" + where, args) q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list) self._pruneData(data_id_list)
def dropUnfinishedData(self): def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
" WHERE `partition` IN (%s)" % ','.join(map(str, offset_list))
q = self.query q = self.query
data_id_list = [x for x, in q("SELECT data_id FROM tobj") if x] q("DELETE FROM tobj" + where)
q("DELETE FROM tobj") q("DELETE FROM ttrans" + where)
q("DELETE FROM ttrans")
self.releaseData(data_id_list, True)
def storeTransaction(self, tid, object_list, transaction, temporary=True): def storeTransaction(self, tid, object_list, transaction, temporary=True):
u64 = util.u64 u64 = util.u64
......
...@@ -75,6 +75,9 @@ class BaseMasterHandler(BaseHandler): ...@@ -75,6 +75,9 @@ class BaseMasterHandler(BaseHandler):
raise ProtocolError('wrong partition table id') raise ProtocolError('wrong partition table id')
app.pt.update(ptid, cell_list, app.nm) app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid))) conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib.util import dump, makeChecksum, add64 from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, ProtocolError, \ from neo.lib.protocol import Packets, Errors, NonReadableCell, ProtocolError, \
ZERO_HASH, INVALID_PARTITION ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, NotRegisteredError from ..transactions import ConflictError, NotRegisteredError
from . import BaseHandler from . import BaseHandler
...@@ -77,19 +77,21 @@ class ClientOperationHandler(BaseHandler): ...@@ -77,19 +77,21 @@ class ClientOperationHandler(BaseHandler):
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerStoreObject(err.tid)) conn.answer(Packets.AnswerStoreObject(err.tid))
return
except NonReadableCell:
logging.info('Ignore store of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid))
except NotRegisteredError: except NotRegisteredError:
# transaction was aborted, cancel this event # transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s', logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid), dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid))) dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(None))
else: else:
if request_time and SLOW_STORE is not None: if request_time and SLOW_STORE is not None:
duration = time.time() - request_time duration = time.time() - request_time
if duration > SLOW_STORE: if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration) logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(None)) conn.answer(Packets.AnswerStoreObject(None))
def askStoreObject(self, conn, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid): compression, checksum, data, data_serial, ttid):
...@@ -200,19 +202,21 @@ class ClientOperationHandler(BaseHandler): ...@@ -200,19 +202,21 @@ class ClientOperationHandler(BaseHandler):
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid)) conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
return
except NonReadableCell:
logging.info('Ignore check of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid))
except NotRegisteredError: except NotRegisteredError:
# transaction was aborted, cancel this event # transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s', logging.info('Forget serial check of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid), dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid))) dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerCheckCurrentSerial(None))
else: else:
if request_time and SLOW_STORE is not None: if request_time and SLOW_STORE is not None:
duration = time.time() - request_time duration = time.time() - request_time
if duration > SLOW_STORE: if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration) logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(None)) conn.answer(Packets.AnswerCheckCurrentSerial(None))
# like ClientOperationHandler but read-only & only for tid <= backup_tid # like ClientOperationHandler but read-only & only for tid <= backup_tid
......
...@@ -28,19 +28,21 @@ class InitializationHandler(BaseMasterHandler): ...@@ -28,19 +28,21 @@ class InitializationHandler(BaseMasterHandler):
raise ProtocolError('Partial partition table received') raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence. # Install the partition table into the database for persistence.
cell_list = [] cell_list = []
num_partitions = pt.getPartitions() offset_list = xrange(pt.getPartitions())
unassigned_set = set(xrange(num_partitions)) unassigned_set = set(offset_list)
for offset in xrange(num_partitions): for offset in offset_list:
for cell in pt.getCellList(offset): for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState())) cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid: if cell.getUUID() == app.uuid:
unassigned_set.remove(offset) unassigned_set.remove(offset)
# delete objects database # delete objects database
dm = app.dm
if unassigned_set: if unassigned_set:
logging.debug('drop data for partitions %r', unassigned_set) logging.debug('drop data for partitions %r', unassigned_set)
app.dm.dropPartitions(unassigned_set) dm.dropPartitions(unassigned_set)
app.dm.changePartitionTable(ptid, cell_list, reset=True) dm.changePartitionTable(ptid, cell_list, reset=True)
dm.commit()
def truncate(self, conn, tid): def truncate(self, conn, tid):
dm = self.app.dm dm = self.app.dm
......
...@@ -31,11 +31,6 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -31,11 +31,6 @@ class MasterOperationHandler(BaseMasterHandler):
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID) dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit() dm.commit()
def notifyPartitionChanges(self, conn, ptid, cell_list):
super(MasterOperationHandler, self).notifyPartitionChanges(
conn, ptid, cell_list)
self.app.replicator.notifyPartitionChanges(cell_list)
def askLockInformation(self, conn, ttid, tid): def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid) self.app.tm.lock(ttid, tid)
conn.answer(Packets.AnswerInformationLocked(ttid)) conn.answer(Packets.AnswerInformationLocked(ttid))
......
...@@ -233,6 +233,8 @@ class Replicator(object): ...@@ -233,6 +233,8 @@ class Replicator(object):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
abort = False abort = False
added_list = [] added_list = []
discarded_list = []
readable_list = []
app = self.app app = self.app
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs() last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
...@@ -245,6 +247,7 @@ class Replicator(object): ...@@ -245,6 +247,7 @@ class Replicator(object):
self.replicate_dict.pop(offset, None) self.replicate_dict.pop(offset, None)
self.source_dict.pop(offset, None) self.source_dict.pop(offset, None)
abort = abort or self.current_partition == offset abort = abort or self.current_partition == offset
discarded_list.append(offset)
elif state == CellStates.OUT_OF_DATE: elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition() self.partition_dict[offset] = p = Partition()
...@@ -255,8 +258,17 @@ class Replicator(object): ...@@ -255,8 +258,17 @@ class Replicator(object):
p.next_obj = last_obj_dict.get(offset, ZERO_TID) p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID p.max_ttid = INVALID_TID
added_list.append(offset) added_list.append(offset)
else:
assert state in (CellStates.UP_TO_DATE,
CellStates.FEEDING), state
readable_list.append(offset)
tm = app.tm
if added_list: if added_list:
self.app.tm.replicating(added_list) tm.replicating(added_list)
if discarded_list:
tm.discarded(discarded_list)
if readable_list:
tm.readable(readable_list)
if abort: if abort:
self.abort() self.abort()
......
...@@ -100,10 +100,40 @@ class TransactionManager(EventQueue): ...@@ -100,10 +100,40 @@ class TransactionManager(EventQueue):
np = app.pt.getPartitions() np = app.pt.getPartitions()
self.getPartition = lambda oid: u64(oid) % np self.getPartition = lambda oid: u64(oid) % np
def discarded(self, offset_list):
self._replicating.difference_update(offset_list)
for offset in offset_list:
self._replicated.pop(offset, None)
getPartition = self.getPartition
for oid_dict in self._load_lock_dict, self._store_lock_dict:
for oid in oid_dict.keys():
if getPartition(oid) in offset_list:
del oid_dict[oid]
data_id_list = []
for transaction in self._transaction_dict.itervalues():
serial_dict = transaction.serial_dict
oid_list = [oid for oid in serial_dict
if getPartition(oid) in offset_list]
for oid in oid_list:
del serial_dict[oid]
try:
data_id_list.append(transaction.store_dict.pop(oid)[1])
except KeyError:
pass
transaction.lockless.difference_update(oid_list)
self._app.dm.dropPartitionsTemporary(offset_list)
self._app.dm.releaseData(data_id_list, True)
# notifyPartitionChanges will commit
self.executeQueuedEvents()
self.read_queue.executeQueuedEvents()
def readable(self, offset_list):
for offset in offset_list:
tid = self._replicated.pop(offset, None)
assert tid is None, (offset, tid)
def replicating(self, offset_list): def replicating(self, offset_list):
self._replicating.update(offset_list) self._replicating.update(offset_list)
# TODO: The following assertions will fail if a replicated partition is
# dropped and this partition is added again.
isdisjoint = set(offset_list).isdisjoint isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(self._replicated), (offset_list, self._replicated) assert isdisjoint(self._replicated), (offset_list, self._replicated)
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), ( assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
...@@ -121,7 +151,7 @@ class TransactionManager(EventQueue): ...@@ -121,7 +151,7 @@ class TransactionManager(EventQueue):
getPartition = self.getPartition getPartition = self.getPartition
store_lock_dict = self._store_lock_dict store_lock_dict = self._store_lock_dict
replicated = self._replicated replicated = self._replicated
notify = set(replicated) notify = {x[0] for x in replicated.iteritems() if x[1]}
# We sort transactions so that in case of muliple stores/checks for the # We sort transactions so that in case of muliple stores/checks for the
# same oid, the lock is taken by the highest locking ttid, which will # same oid, the lock is taken by the highest locking ttid, which will
# delay new transactions. # delay new transactions.
...@@ -133,7 +163,7 @@ class TransactionManager(EventQueue): ...@@ -133,7 +163,7 @@ class TransactionManager(EventQueue):
txn.lockless, txn.serial_dict) txn.lockless, txn.serial_dict)
for oid in txn.lockless: for oid in txn.lockless:
partition = getPartition(oid) partition = getPartition(oid)
if partition in replicated: if replicated.get(partition):
if store_lock_dict.get(oid, ttid) != ttid: if store_lock_dict.get(oid, ttid) != ttid:
# We have a "multi-lock" store, i.e. an # We have a "multi-lock" store, i.e. an
# initially-lockless store to a partition that became # initially-lockless store to a partition that became
...@@ -146,7 +176,8 @@ class TransactionManager(EventQueue): ...@@ -146,7 +176,8 @@ class TransactionManager(EventQueue):
# readable cells to check locks: we're really up-to-date. # readable cells to check locks: we're really up-to-date.
for partition in notify: for partition in notify:
self._app.master_conn.send(Packets.NotifyReplicationDone( self._app.master_conn.send(Packets.NotifyReplicationDone(
partition, replicated.pop(partition))) partition, replicated[partition]))
replicated[partition] = None
for oid, ttid in store_lock_dict.iteritems(): for oid, ttid in store_lock_dict.iteritems():
if getPartition(oid) in notify: if getPartition(oid) in notify:
# Use 'discard' instead of 'remove', for oids that were # Use 'discard' instead of 'remove', for oids that were
...@@ -393,6 +424,13 @@ class TransactionManager(EventQueue): ...@@ -393,6 +424,13 @@ class TransactionManager(EventQueue):
except NonReadableCell: except NonReadableCell:
partition = self.getPartition(oid) partition = self.getPartition(oid)
if partition not in self._replicated: if partition not in self._replicated:
# Either the partition is discarded or we haven't yet
# received the notification from the master that the
# partition is assigned to us. In the latter case, we're
# not expected to have the partition in full.
# We'll return a successful answer to the client, which
# is fine because there's at least one other cell that is
# readable for this oid.
raise raise
with self._app.dm.replicated(partition): with self._app.dm.replicated(partition):
previous_serial = self._app.dm.getLastObjectTID(oid) previous_serial = self._app.dm.getLastObjectTID(oid)
...@@ -448,6 +486,7 @@ class TransactionManager(EventQueue): ...@@ -448,6 +486,7 @@ class TransactionManager(EventQueue):
# There was a previous rebase for this oid, it was still delayed # There was a previous rebase for this oid, it was still delayed
# during the second RebaseTransaction, and then a conflict was # during the second RebaseTransaction, and then a conflict was
# reported when another transaction was committed. # reported when another transaction was committed.
# This can also happen when a partition is dropped.
logging.info("no oid %s to rebase for transaction %s", logging.info("no oid %s to rebase for transaction %s",
dump(oid), dump(ttid)) dump(oid), dump(ttid))
return return
...@@ -494,6 +533,7 @@ class TransactionManager(EventQueue): ...@@ -494,6 +533,7 @@ class TransactionManager(EventQueue):
dm.abortTransaction(ttid) dm.abortTransaction(ttid)
dm.releaseData([x[1] for x in transaction.store_dict.itervalues()], dm.releaseData([x[1] for x in transaction.store_dict.itervalues()],
True) True)
dm.commit()
# unlock any object # unlock any object
for oid in transaction.serial_dict: for oid in transaction.serial_dict:
if locked: if locked:
...@@ -513,8 +553,8 @@ class TransactionManager(EventQueue): ...@@ -513,8 +553,8 @@ class TransactionManager(EventQueue):
x = (oid, ttid, write_locking_tid, x = (oid, ttid, write_locking_tid,
self._replicated, transaction.lockless) self._replicated, transaction.lockless)
lockless = oid in transaction.lockless lockless = oid in transaction.lockless
assert oid in other.serial_dict and lockless == ( assert oid in other.serial_dict and lockless == bool(
self.getPartition(oid) in self._replicated), x self._replicated.get(self.getPartition(oid))), x
if not lockless: if not lockless:
assert not locked, x assert not locked, x
continue # unresolved deadlock continue # unresolved deadlock
......
...@@ -397,6 +397,12 @@ class TransactionalResource(object): ...@@ -397,6 +397,12 @@ class TransactionalResource(object):
self.__dict__.update(kw) self.__dict__.update(kw)
txn.get().join(self) txn.get().join(self)
def __call__(self, func):
name = func.__name__
assert callable(IDataManager.get(name)), name
setattr(self, name, func)
return func
def __getattr__(self, attr): def __getattr__(self, attr):
if callable(IDataManager.get(attr)): if callable(IDataManager.get(attr)):
return lambda *_: None return lambda *_: None
......
...@@ -72,6 +72,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -72,6 +72,7 @@ class StorageDBTests(NeoUnitTestBase):
db.changePartitionTable(1, db.changePartitionTable(1,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)], [(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True) reset=True)
db.commit()
def checkConfigEntry(self, get_call, set_call, value): def checkConfigEntry(self, get_call, set_call, value):
# generic test for all configuration entries accessors # generic test for all configuration entries accessors
...@@ -101,10 +102,6 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -101,10 +102,6 @@ class StorageDBTests(NeoUnitTestBase):
db = self.getDB() db = self.getDB()
self.checkConfigEntry(db.getName, db.setName, 'TEST_NAME') self.checkConfigEntry(db.getName, db.setName, 'TEST_NAME')
def test_15_PTID(self):
db = self.getDB()
self.checkConfigEntry(db.getPTID, db.setPTID, 1)
def test_getPartitionTable(self): def test_getPartitionTable(self):
db = self.getDB() db = self.getDB()
uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID() uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID()
......
...@@ -14,9 +14,7 @@ ...@@ -14,9 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import random import random, sys, threading, time
import sys
import time
import transaction import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError from ZODB.POSException import ReadOnlyError, POSKeyError
import unittest import unittest
...@@ -33,10 +31,10 @@ from neo.lib.event import EventManager ...@@ -33,10 +31,10 @@ from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64, u64 from neo.lib.util import p64, u64
from .. import expectedFailure, Patch from .. import expectedFailure, Patch, TransactionalResource
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \ from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, with_cluster predictable_random, with_cluster
from .test import PCounter # XXX from .test import PCounter, PCounterWithResolution # XXX
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
...@@ -458,6 +456,116 @@ class ReplicationTests(NEOThreadedTest): ...@@ -458,6 +456,116 @@ class ReplicationTests(NEOThreadedTest):
self.assertTrue(s.is_alive()) self.assertTrue(s.is_alive())
self.checkReplicas(cluster) self.checkReplicas(cluster)
@with_cluster(start_cluster=0, replicas=1, storage_count=4, partitions=2)
def testTweakVsReplication(self, cluster, done=False):
S = cluster.storage_list
cluster.start(S[:1])
t, c = cluster.getTransaction()
ob = c.root()[''] = PCounterWithResolution()
t.commit()
self.assertEqual(1, u64(ob._p_oid))
for s in S[1:]:
s.start()
self.tic()
def tweak():
self.tic()
self.assertFalse(delay_list)
self.assertPartitionTable(cluster, 'UU|UO')
f.delayAskFetchObjects()
cluster.enableStorageList(S[2:])
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertPartitionTable(cluster, 'UU..|F.OO')
with ConnectionFilter() as f, cluster.moduloTID(1), \
Patch(S[1].replicator,
_nextPartitionSortKey=lambda orig, offset: offset):
delay_list = [1, 0]
delay = (f.delayNotifyReplicationDone if done else
f.delayAnswerFetchObjects)(lambda _: delay_list.pop())
cluster.enableStorageList((S[1],))
cluster.neoctl.tweakPartitionTable()
ob._p_changed = 1
if done:
tweak()
t.commit()
else:
t2, c2 = cluster.getTransaction()
c2.root()['']._p_changed = 1
l = threading.Lock(); l.acquire()
TransactionalResource(t2, 0, tpc_vote=lambda _: l.release())
t2 = self.newPausedThread(t2.commit)
self.tic()
@TransactionalResource(t, 0)
def tpc_vote(_):
t2.start()
l.acquire()
f.remove(delay)
tweak()
t.commit()
t2.join()
cluster.neoctl.dropNode(S[2].uuid)
cluster.neoctl.dropNode(S[3].uuid)
cluster.neoctl.tweakPartitionTable()
if done:
f.remove(delay)
self.tic()
self.assertPartitionTable(cluster, 'UU|UO')
self.tic()
self.assertPartitionTable(cluster, 'UU|UU')
self.checkReplicas(cluster)
def testTweakVsReplicationDone(self):
self.testTweakVsReplication(True)
@with_cluster(start_cluster=0, storage_count=2, partitions=2)
def testCommitVsDiscardedCell(self, cluster):
s0, s1 = cluster.storage_list
cluster.start((s0,))
t, c = cluster.getTransaction()
ob = c.root()[''] = PCounterWithResolution()
t.commit()
self.assertEqual(1, u64(ob._p_oid))
s1.start()
self.tic()
nonlocal_ = []
with ConnectionFilter() as f:
delay = f.delayNotifyReplicationDone()
cluster.enableStorageList((s1,))
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertPartitionTable(cluster, 'U.|FO')
t2, c2 = cluster.getTransaction()
c2.root()[''].value += 3
l = threading.Lock(); l.acquire()
@TransactionalResource(t2, 0)
def tpc_vote(_):
self.tic()
l.release()
t2 = self.newPausedThread(t2.commit)
@TransactionalResource(t, 0, tpc_finish=lambda _:
f.remove(nonlocal_.pop(0)))
def tpc_vote(_):
t2.start()
l.acquire()
nonlocal_.append(f.delayNotifyPartitionChanges())
f.remove(delay)
self.tic()
self.assertPartitionTable(cluster, 'U.|.U', cluster.master)
nonlocal_.append(cluster.master.pt.getID())
ob.value += 2
t.commit()
t2.join()
self.tic()
self.assertPartitionTable(cluster, 'U.|.U')
self.assertEqual(cluster.master.pt.getID(), nonlocal_.pop())
t.begin()
self.assertEqual(ob.value, 5)
# get the second to last tid (for which ob=2)
tid2 = s1.dm.getObject(ob._p_oid, None, ob._p_serial)[0]
# s0 must not have committed anything for partition 1
with s0.dm.replicated(1):
self.assertFalse(s0.dm.getObject(ob._p_oid, tid2))
@with_cluster(start_cluster=0, replicas=1) @with_cluster(start_cluster=0, replicas=1)
def testResumingReplication(self, cluster): def testResumingReplication(self, cluster):
if 1: if 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment