Commit a0ceb810 authored by Julien Muchembled's avatar Julien Muchembled

wip

parent 5fa8a9d1
...@@ -349,7 +349,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -349,7 +349,7 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw) super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions _getLastTID deleteObject deleteTransaction _dropPartition _getLastTID
getReplicationObjectList _getTIDList nonempty""".split()) getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition) _getPartition = property(lambda self: self.db._getPartition)
......
...@@ -19,6 +19,7 @@ from collections import defaultdict ...@@ -19,6 +19,7 @@ from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from copy import copy from copy import copy
from functools import wraps from functools import wraps
from time import time
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.interfaces import abstract, requires from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID
...@@ -53,6 +54,8 @@ class DatabaseManager(object): ...@@ -53,6 +54,8 @@ class DatabaseManager(object):
LOCKED = "error: database is locked" LOCKED = "error: database is locked"
_deferred = 0 _deferred = 0
_drop_stats = 0, 0
_dropping = None
_repairing = None _repairing = None
def __init__(self, database, engine=None, wait=None): def __init__(self, database, engine=None, wait=None):
...@@ -212,7 +215,8 @@ class DatabaseManager(object): ...@@ -212,7 +215,8 @@ class DatabaseManager(object):
self.setConfiguration("version", version) self.setConfiguration("version", version)
def doOperation(self, app): def doOperation(self, app):
pass if self._dropping:
self._dropPartitions(app)
def _close(self): def _close(self):
"""Backend-specific code to close the database""" """Backend-specific code to close the database"""
...@@ -559,7 +563,8 @@ class DatabaseManager(object): ...@@ -559,7 +563,8 @@ class DatabaseManager(object):
if -x[1] in READABLE) if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID) @requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, num_replicas, cell_list, reset=False): def changePartitionTable(self, app, ptid, num_replicas, cell_list,
reset=False):
my_nid = self.getUUID() my_nid = self.getUUID()
pt = dict(self.iterAssignedCells()) pt = dict(self.iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be # In backup mode, the last transactions of a readable cell may be
...@@ -567,23 +572,41 @@ class DatabaseManager(object): ...@@ -567,23 +572,41 @@ class DatabaseManager(object):
backup_tid = self.getBackupTID() backup_tid = self.getBackupTID()
if backup_tid: if backup_tid:
backup_tid = util.u64(backup_tid) backup_tid = util.u64(backup_tid)
def outofdate_tid(offset): max_offset = -1
tid = pt.get(offset, 0) dropping = self._dropping or set()
if tid >= 0: assigned = []
return tid cells = []
return -tid in READABLE and (backup_tid or for offset, nid, state in cell_list:
max(self._getLastIDs(offset)[0], if max_offset < offset:
self._getLastTID(offset))) or 0 max_offset = offset
cell_list = [(offset, nid, ( if state == CellStates.DISCARDED:
None if state == CellStates.DISCARDED else if nid == my_nid:
-state if nid != my_nid or state != CellStates.OUT_OF_DATE else dropping.add(offset)
outofdate_tid(offset))) tid = None
for offset, nid, state in cell_list] else:
self._changePartitionTable(cell_list, reset) if nid == my_nid:
assigned.append(offset)
if nid != my_nid or state != CellStates.OUT_OF_DATE:
tid = -state
else:
tid = pt.get(offset, 0)
if tid < 0:
tid = -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cells.append((offset, nid, tid))
if reset:
dropping.update(xrange(max_offset + 1))
dropping.difference_update(assigned)
self._changePartitionTable(cells, reset)
self._updateReadable(reset) self._updateReadable(reset)
assert isinstance(ptid, (int, long)), ptid assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid)) self._setConfiguration('ptid', str(ptid))
self._setConfiguration('replicas', str(num_replicas)) self._setConfiguration('replicas', str(num_replicas))
if dropping and not self._dropping:
self._dropping = dropping
if app.operational:
self._dropPartitions(app)
@requires(_changePartitionTable) @requires(_changePartitionTable)
def updateCellTID(self, partition, tid): def updateCellTID(self, partition, tid):
...@@ -628,9 +651,70 @@ class DatabaseManager(object): ...@@ -628,9 +651,70 @@ class DatabaseManager(object):
else: else:
yield offset, None yield offset, None
def _dropPartitions(self, app):
if app.disable_drop_partitions:
logging.info("don't drop data for partitions %r", self._dropping)
return
def dropPartitions():
dropping = self._dropping
before = drop_count, drop_time = self._drop_stats
commit = dropped = 0
while dropping:
offset = next(iter(dropping))
log = dropped
while True:
yield 1
if offset not in dropping:
break
start = time()
if 0 < commit < start:
self.commit()
logging.debug('drop: committed')
commit = 0
continue
data_id_list = self._dropPartition(offset,
# The efficiency drops when the number of lines to
# delete is too small so do not delete too few.
max(100, int(.1 * drop_count / drop_time))
if drop_time else 1000)
if data_id_list:
if not commit:
commit = time() + 1
if log == dropped:
dropped += 1
logging.info("dropping partition %s...", offset)
if type(data_id_list) is list:
try:
data_id_list.remove(None)
pass # XXX: not covered
except ValueError:
pass
logging.debug('drop: pruneData(%s)',
len(data_id_list))
drop_count += self._pruneData(data_id_list)
drop_time += time() - start
self._drop_stats = drop_count, drop_time
continue
dropping.remove(offset)
break
if dropped:
if commit:
self.commit()
logging.info("%s partition(s) dropped"
" (stats: count: %s/%s, time: %.4s/%.4s)",
dropped, drop_count - before[0], drop_count,
round(drop_time - before[1], 3), round(drop_time, 3))
app.newTask(dropPartitions())
@abstract @abstract
def dropPartitions(self, offset_list): def _dropPartition(self, offset, count):
"""Delete all data for specified partitions""" """Delete rows for given partition
Delete at most 'count' rows of from obj:
- if there's no line to delete, purge trans and return
a boolean indicating if any row was deleted (from trans)
- else return data ids of deleted rows
"""
def _getUnfinishedDataIdList(self): def _getUnfinishedDataIdList(self):
"""Drop any unfinished data from a database.""" """Drop any unfinished data from a database."""
......
...@@ -515,19 +515,20 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -515,19 +515,20 @@ class MySQLDatabaseManager(DatabaseManager):
" ON DUPLICATE KEY UPDATE tid = %d" " ON DUPLICATE KEY UPDATE tid = %d"
% (offset, nid, tid, tid)) % (offset, nid, tid, tid))
def dropPartitions(self, offset_list): def _dropPartition(self, offset, count):
q = self.query q = self.query
# XXX: these queries are inefficient (execution time increase with where = " WHERE `partition`=%s ORDER BY tid, oid LIMIT %s" % (
# row count, although we use indexes) when there are rows to offset, count)
# delete. It should be done as an idle task, by chunks. logging.debug("drop: select(%s)", count)
for partition in offset_list: x = q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)" + where)
where = " WHERE `partition`=%d" % partition if x:
data_id_list = [x for x, in logging.debug("drop: obj")
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL" % where)]
q("DELETE FROM obj" + where) q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where) return [x for x, in x]
self._pruneData(data_id_list) logging.debug("drop: trans")
q("DELETE FROM trans WHERE `partition`=%s" % offset)
(x,), = q('SELECT ROW_COUNT()')
return x
def _getUnfinishedDataIdList(self): def _getUnfinishedDataIdList(self):
return [x for x, in self.query( return [x for x, in self.query(
......
...@@ -364,17 +364,14 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -364,17 +364,14 @@ class SQLiteDatabaseManager(DatabaseManager):
q("INSERT OR FAIL INTO pt VALUES (?,?,?)", q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state))) (offset, nid, int(state)))
def dropPartitions(self, offset_list): def _dropPartition(self, *args):
where = " WHERE partition=?"
q = self.query q = self.query
for partition in offset_list: where = " FROM obj WHERE partition=? ORDER BY tid, oid LIMIT ?"
args = partition, x = q("SELECT data_id" + where, args).fetchall()
data_id_list = [x for x, in q( if x:
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL" q("DELETE" + where, args)
% where, args)] return [x for x, in x]
q("DELETE FROM obj" + where, args) return q("DELETE FROM trans WHERE partition=?", args[:1]).rowcount
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
def _getUnfinishedDataIdList(self): def _getUnfinishedDataIdList(self):
return [x for x, in self.query( return [x for x, in self.query(
......
...@@ -72,7 +72,7 @@ class BaseMasterHandler(BaseHandler): ...@@ -72,7 +72,7 @@ class BaseMasterHandler(BaseHandler):
if ptid != 1 + app.pt.getID(): if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id') raise ProtocolError('wrong partition table id')
app.pt.update(ptid, num_replicas, cell_list, app.nm) app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.dm.changePartitionTable(ptid, num_replicas, cell_list) app.dm.changePartitionTable(app, ptid, num_replicas, cell_list)
if app.operational: if app.operational:
app.replicator.notifyPartitionChanges(cell_list) app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit() app.dm.commit()
......
...@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler): ...@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler):
pt.load(ptid, num_replicas, row_list, app.nm) pt.load(ptid, num_replicas, row_list, app.nm)
if not pt.filled(): if not pt.filled():
raise ProtocolError('Partial partition table received') raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence. cell_list = [(offset, cell.getUUID(), cell.getState())
cell_list = [] for offset in xrange(pt.getPartitions())
unassigned = range(pt.getPartitions()) for cell in pt.getCellList(offset)]
for offset in reversed(unassigned):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned.remove(offset)
# delete objects database
dm = app.dm dm = app.dm
if unassigned: dm.changePartitionTable(app, ptid, num_replicas, cell_list, reset=True)
if app.disable_drop_partitions:
logging.info('partitions %r are discarded but actual deletion'
' of data is disabled', unassigned)
else:
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, num_replicas, cell_list, reset=True)
dm.commit() dm.commit()
def truncate(self, conn, tid): def truncate(self, conn, tid):
......
...@@ -90,7 +90,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase): ...@@ -90,7 +90,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# dm call # dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable') calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid, 1, cells) calls[0].checkArgs(app, ptid, 1, cells)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -53,7 +53,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -53,7 +53,7 @@ class StorageDBTests(NeoUnitTestBase):
uuid = self.getStorageUUID() uuid = self.getStorageUUID()
db.setUUID(uuid) db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID()) self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(1, 0, db.changePartitionTable(None, 1, 0,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)], [(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True) reset=True)
self.assertEqual(num_partitions, 1 + db._getMaxPartition()) self.assertEqual(num_partitions, 1 + db._getMaxPartition())
......
...@@ -477,13 +477,10 @@ class ReplicationTests(NEOThreadedTest): ...@@ -477,13 +477,10 @@ class ReplicationTests(NEOThreadedTest):
return isinstance(packet, delayed) and \ return isinstance(packet, delayed) and \
packet._args[0] == offset and \ packet._args[0] == offset and \
conn in s1.getConnectionList(s0) conn in s1.getConnectionList(s0)
def changePartitionTable(orig, ptid, num_replicas, cell_list): def changePartitionTable(orig, app, ptid, num_replicas, cell_list):
if (offset, s0.uuid, CellStates.DISCARDED) in cell_list: if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
connection_filter.remove(delayAskFetch) connection_filter.remove(delayAskFetch)
# XXX: this is currently not done by return orig(app, ptid, num_replicas, cell_list)
# default for performance reason
orig.im_self.dropPartitions((offset,))
return orig(ptid, num_replicas, cell_list)
np = cluster.num_partitions np = cluster.num_partitions
s0, s1, s2 = cluster.storage_list s0, s1, s2 = cluster.storage_list
for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects: for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
...@@ -708,17 +705,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -708,17 +705,7 @@ class ReplicationTests(NEOThreadedTest):
cluster.neoctl.tweakPartitionTable() cluster.neoctl.tweakPartitionTable()
self.tic() self.tic()
self.assertEqual(1, s1.sqlCount('obj')) self.assertEqual(1, s1.sqlCount('obj'))
# Deletion should start as soon as the cell is discarded, as a
# background task, instead of doing it during initialization.
count = s0.sqlCount('obj')
s0.stop()
cluster.join((s0,))
s0.resetNode()
s0.start()
self.tic()
self.assertEqual(2, s0.sqlCount('obj')) self.assertEqual(2, s0.sqlCount('obj'))
with self.expectedFailure(): \
self.assertEqual(2, count)
@with_cluster(replicas=1) @with_cluster(replicas=1)
def testResumingReplication(self, cluster): def testResumingReplication(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment