...
  View open merge request
Commits (2)
......@@ -349,7 +349,7 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions _getLastTID
deleteObject deleteTransaction _dropPartition _getLastTID
getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition)
......
......@@ -19,6 +19,7 @@ from collections import defaultdict
from contextlib import contextmanager
from copy import copy
from functools import wraps
from time import time
from neo.lib import logging, util
from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID
......@@ -53,6 +54,8 @@ class DatabaseManager(object):
LOCKED = "error: database is locked"
_deferred = 0
_drop_stats = 0, 0
_dropping = None
_repairing = None
def __init__(self, database, engine=None, wait=None):
......@@ -212,7 +215,8 @@ class DatabaseManager(object):
self.setConfiguration("version", version)
def doOperation(self, app):
pass
if self._dropping:
self._dropPartitions(app)
def _close(self):
"""Backend-specific code to close the database"""
......@@ -559,7 +563,8 @@ class DatabaseManager(object):
if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, num_replicas, cell_list, reset=False):
def changePartitionTable(self, app, ptid, num_replicas, cell_list,
reset=False):
my_nid = self.getUUID()
pt = dict(self.iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
......@@ -567,23 +572,41 @@ class DatabaseManager(object):
backup_tid = self.getBackupTID()
if backup_tid:
backup_tid = util.u64(backup_tid)
def outofdate_tid(offset):
tid = pt.get(offset, 0)
if tid >= 0:
return tid
return -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cell_list = [(offset, nid, (
None if state == CellStates.DISCARDED else
-state if nid != my_nid or state != CellStates.OUT_OF_DATE else
outofdate_tid(offset)))
for offset, nid, state in cell_list]
self._changePartitionTable(cell_list, reset)
max_offset = -1
dropping = self._dropping or set()
assigned = []
cells = []
for offset, nid, state in cell_list:
if max_offset < offset:
max_offset = offset
if state == CellStates.DISCARDED:
if nid == my_nid:
dropping.add(offset)
tid = None
else:
if nid == my_nid:
assigned.append(offset)
if nid != my_nid or state != CellStates.OUT_OF_DATE:
tid = -state
else:
tid = pt.get(offset, 0)
if tid < 0:
tid = -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cells.append((offset, nid, tid))
if reset:
dropping.update(xrange(max_offset + 1))
dropping.difference_update(assigned)
self._changePartitionTable(cells, reset)
self._updateReadable(reset)
assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
self._setConfiguration('replicas', str(num_replicas))
if dropping and not self._dropping:
self._dropping = dropping
if app.operational:
self._dropPartitions(app)
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
......@@ -628,9 +651,70 @@ class DatabaseManager(object):
else:
yield offset, None
def _dropPartitions(self, app):
if app.disable_drop_partitions:
logging.info("don't drop data for partitions %r", self._dropping)
return
def dropPartitions():
dropping = self._dropping
before = drop_count, drop_time = self._drop_stats
commit = dropped = 0
while dropping:
offset = next(iter(dropping))
log = dropped
while True:
yield 1
if offset not in dropping:
break
start = time()
if 0 < commit < start:
self.commit()
logging.debug('drop: committed')
commit = 0
continue
data_id_list = self._dropPartition(offset,
# The efficiency drops when the number of lines to
# delete is too small so do not delete too few.
max(100, int(.1 * drop_count / drop_time))
if drop_time else 1000)
if data_id_list:
if not commit:
commit = time() + 1
if log == dropped:
dropped += 1
logging.info("dropping partition %s...", offset)
if type(data_id_list) is list:
try:
data_id_list.remove(None)
pass # XXX: not covered
except ValueError:
pass
logging.debug('drop: pruneData(%s)',
len(data_id_list))
drop_count += self._pruneData(data_id_list)
drop_time += time() - start
self._drop_stats = drop_count, drop_time
continue
dropping.remove(offset)
break
if dropped:
if commit:
self.commit()
logging.info("%s partition(s) dropped"
" (stats: count: %s/%s, time: %.4s/%.4s)",
dropped, drop_count - before[0], drop_count,
round(drop_time - before[1], 3), round(drop_time, 3))
app.newTask(dropPartitions())
@abstract
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
def _dropPartition(self, offset, count):
"""Delete rows for given partition
Delete at most 'count' rows of from obj:
- if there's no line to delete, purge trans and return
a boolean indicating if any row was deleted (from trans)
- else return data ids of deleted rows
"""
def _getUnfinishedDataIdList(self):
"""Drop any unfinished data from a database."""
......
......@@ -22,9 +22,6 @@ from MySQLdb import DataError, IntegrityError, \
OperationalError, ProgrammingError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
from MySQLdb.constants.ER import DATA_TOO_LONG, DUP_ENTRY, NO_SUCH_TABLE
# BBB: the following 2 constants were added to mysqlclient 1.3.8
DROP_LAST_PARTITION = 1508
SAME_NAME_PARTITION = 1517
from array import array
from hashlib import sha1
import os
......@@ -94,8 +91,6 @@ class MySQLDatabaseManager(DatabaseManager):
ENGINES = "InnoDB", "RocksDB", "TokuDB"
_engine = ENGINES[0] # default engine
_use_partition = False
_max_allowed_packet = 32769 * 1024
def _parse(self, database):
......@@ -300,10 +295,6 @@ class MySQLDatabaseManager(DatabaseManager):
PRIMARY KEY (`partition`, nid)
) ENGINE=""" + engine
if self._use_partition:
p += """ PARTITION BY LIST (`partition`) (
PARTITION dummy VALUES IN (NULL))"""
if engine == "RocksDB":
cf = lambda name, rev=False: " COMMENT '%scf_neo_%s'" % (
'rev:' if rev else '', name)
......@@ -523,40 +514,21 @@ class MySQLDatabaseManager(DatabaseManager):
q("INSERT INTO pt VALUES (%d, %d, %d)"
" ON DUPLICATE KEY UPDATE tid = %d"
% (offset, nid, tid, tid))
if self._use_partition:
for offset in offset_list:
add = """ALTER TABLE %%s ADD PARTITION (
PARTITION p%u VALUES IN (%u))""" % (offset, offset)
for table in 'trans', 'obj':
try:
self.query(add % table)
except MysqlError as e:
if e.code != SAME_NAME_PARTITION:
raise
def dropPartitions(self, offset_list):
def _dropPartition(self, offset, count):
q = self.query
# XXX: these queries are inefficient (execution time increase with
# row count, although we use indexes) when there are rows to
# delete. It should be done as an idle task, by chunks.
for partition in offset_list:
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL" % where)]
if not self._use_partition:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
self._pruneData(data_id_list)
if self._use_partition:
drop = "ALTER TABLE %s DROP PARTITION" + \
','.join(' p%u' % i for i in offset_list)
for table in 'trans', 'obj':
try:
self.query(drop % table)
except MysqlError as e:
if e.code != DROP_LAST_PARTITION:
raise
where = " WHERE `partition`=%s ORDER BY tid, oid LIMIT %s" % (
offset, count)
logging.debug("drop: select(%s)", count)
x = q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)" + where)
if x:
logging.debug("drop: obj")
q("DELETE FROM obj" + where)
return [x for x, in x]
logging.debug("drop: trans")
q("DELETE FROM trans WHERE `partition`=%s" % offset)
(x,), = q('SELECT ROW_COUNT()')
return x
def _getUnfinishedDataIdList(self):
return [x for x, in self.query(
......
......@@ -364,17 +364,14 @@ class SQLiteDatabaseManager(DatabaseManager):
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
def dropPartitions(self, offset_list):
where = " WHERE partition=?"
def _dropPartition(self, *args):
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
% where, args)]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
where = " FROM obj WHERE partition=? ORDER BY tid, oid LIMIT ?"
x = q("SELECT data_id" + where, args).fetchall()
if x:
q("DELETE" + where, args)
return [x for x, in x]
return q("DELETE FROM trans WHERE partition=?", args[:1]).rowcount
def _getUnfinishedDataIdList(self):
return [x for x, in self.query(
......
......@@ -72,7 +72,7 @@ class BaseMasterHandler(BaseHandler):
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.dm.changePartitionTable(ptid, num_replicas, cell_list)
app.dm.changePartitionTable(app, ptid, num_replicas, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
......
......@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler):
pt.load(ptid, num_replicas, row_list, app.nm)
if not pt.filled():
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
cell_list = []
unassigned = range(pt.getPartitions())
for offset in reversed(unassigned):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned.remove(offset)
# delete objects database
cell_list = [(offset, cell.getUUID(), cell.getState())
for offset in xrange(pt.getPartitions())
for cell in pt.getCellList(offset)]
dm = app.dm
if unassigned:
if app.disable_drop_partitions:
logging.info('partitions %r are discarded but actual deletion'
' of data is disabled', unassigned)
else:
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, num_replicas, cell_list, reset=True)
dm.changePartitionTable(app, ptid, num_replicas, cell_list, reset=True)
dm.commit()
def truncate(self, conn, tid):
......
......@@ -90,7 +90,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid, 1, cells)
calls[0].checkArgs(app, ptid, 1, cells)
if __name__ == "__main__":
unittest.main()
......@@ -53,7 +53,7 @@ class StorageDBTests(NeoUnitTestBase):
uuid = self.getStorageUUID()
db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(1, 0,
db.changePartitionTable(None, 1, 0,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True)
self.assertEqual(num_partitions, 1 + db._getMaxPartition())
......
......@@ -477,13 +477,10 @@ class ReplicationTests(NEOThreadedTest):
return isinstance(packet, delayed) and \
packet._args[0] == offset and \
conn in s1.getConnectionList(s0)
def changePartitionTable(orig, ptid, num_replicas, cell_list):
def changePartitionTable(orig, app, ptid, num_replicas, cell_list):
if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
connection_filter.remove(delayAskFetch)
# XXX: this is currently not done by
# default for performance reason
orig.im_self.dropPartitions((offset,))
return orig(ptid, num_replicas, cell_list)
return orig(app, ptid, num_replicas, cell_list)
np = cluster.num_partitions
s0, s1, s2 = cluster.storage_list
for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
......@@ -708,17 +705,7 @@ class ReplicationTests(NEOThreadedTest):
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertEqual(1, s1.sqlCount('obj'))
# Deletion should start as soon as the cell is discarded, as a
# background task, instead of doing it during initialization.
count = s0.sqlCount('obj')
s0.stop()
cluster.join((s0,))
s0.resetNode()
s0.start()
self.tic()
self.assertEqual(2, s0.sqlCount('obj'))
with self.expectedFailure(): \
self.assertEqual(2, count)
@with_cluster(replicas=1)
def testResumingReplication(self, cluster):
......