From a0ceb8106f2c4bcfb17589c2674d0615d600e484 Mon Sep 17 00:00:00 2001
From: Julien Muchembled <jm@nexedi.com>
Date: Tue, 9 May 2017 20:32:24 +0200
Subject: [PATCH] wip

---
 neo/storage/database/importer.py        |   2 +-
 neo/storage/database/manager.py         | 118 ++++++++++++++++++++----
 neo/storage/database/mysqldb.py         |  23 ++---
 neo/storage/database/sqlite.py          |  17 ++--
 neo/storage/handlers/__init__.py        |   2 +-
 neo/storage/handlers/initialization.py  |  22 +----
 neo/tests/storage/testMasterHandler.py  |   2 +-
 neo/tests/storage/testStorageDBTests.py |   2 +-
 neo/tests/threaded/testReplication.py   |  17 +---
 9 files changed, 130 insertions(+), 75 deletions(-)

diff --git a/neo/storage/database/importer.py b/neo/storage/database/importer.py
index 46027a14..922eeb77 100644
--- a/neo/storage/database/importer.py
+++ b/neo/storage/database/importer.py
@@ -349,7 +349,7 @@ class ImporterDatabaseManager(DatabaseManager):
     def __init__(self, *args, **kw):
         super(ImporterDatabaseManager, self).__init__(*args, **kw)
         implements(self, """_getNextTID checkSerialRange checkTIDRange
-            deleteObject deleteTransaction dropPartitions _getLastTID
+            deleteObject deleteTransaction _dropPartition _getLastTID
             getReplicationObjectList _getTIDList nonempty""".split())
 
     _getPartition = property(lambda self: self.db._getPartition)
diff --git a/neo/storage/database/manager.py b/neo/storage/database/manager.py
index 103ec9ff..5f45dfc3 100644
--- a/neo/storage/database/manager.py
+++ b/neo/storage/database/manager.py
@@ -19,6 +19,7 @@ from collections import defaultdict
 from contextlib import contextmanager
 from copy import copy
 from functools import wraps
+from time import time
 from neo.lib import logging, util
 from neo.lib.interfaces import abstract, requires
 from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID
@@ -53,6 +54,8 @@ class DatabaseManager(object):
     LOCKED = "error: database is locked"
 
     _deferred = 0
+    _drop_stats = 0, 0
+    _dropping = None
     _repairing = None
 
     def __init__(self, database, engine=None, wait=None):
@@ -212,7 +215,8 @@ class DatabaseManager(object):
             self.setConfiguration("version", version)
 
     def doOperation(self, app):
-        pass
+        if self._dropping:
+            self._dropPartitions(app)
 
     def _close(self):
         """Backend-specific code to close the database"""
@@ -559,7 +563,8 @@ class DatabaseManager(object):
                                  if -x[1] in READABLE)
 
     @requires(_changePartitionTable, _getLastIDs, _getLastTID)
-    def changePartitionTable(self, ptid, num_replicas, cell_list, reset=False):
+    def changePartitionTable(self, app, ptid, num_replicas, cell_list,
+                             reset=False):
         my_nid = self.getUUID()
         pt = dict(self.iterAssignedCells())
         # In backup mode, the last transactions of a readable cell may be
@@ -567,23 +572,41 @@ class DatabaseManager(object):
         backup_tid = self.getBackupTID()
         if backup_tid:
             backup_tid = util.u64(backup_tid)
-        def outofdate_tid(offset):
-            tid = pt.get(offset, 0)
-            if tid >= 0:
-                return tid
-            return -tid in READABLE and (backup_tid or
-                max(self._getLastIDs(offset)[0],
-                    self._getLastTID(offset))) or 0
-        cell_list = [(offset, nid, (
-                None if state == CellStates.DISCARDED else
-                -state if nid != my_nid or state != CellStates.OUT_OF_DATE else
-                outofdate_tid(offset)))
-            for offset, nid, state in cell_list]
-        self._changePartitionTable(cell_list, reset)
+        max_offset = -1
+        dropping = self._dropping or set()
+        assigned = []
+        cells = []
+        for offset, nid, state in cell_list:
+            if max_offset < offset:
+                max_offset = offset
+            if state == CellStates.DISCARDED:
+                if nid == my_nid:
+                    dropping.add(offset)
+                tid = None
+            else:
+                if nid == my_nid:
+                    assigned.append(offset)
+                if nid != my_nid or state != CellStates.OUT_OF_DATE:
+                    tid = -state
+                else:
+                    tid = pt.get(offset, 0)
+                    if tid < 0:
+                        tid = -tid in READABLE and (backup_tid or
+                            max(self._getLastIDs(offset)[0],
+                                self._getLastTID(offset))) or 0
+            cells.append((offset, nid, tid))
+        if reset:
+            dropping.update(xrange(max_offset + 1))
+            dropping.difference_update(assigned)
+        self._changePartitionTable(cells, reset)
         self._updateReadable(reset)
         assert isinstance(ptid, (int, long)), ptid
         self._setConfiguration('ptid', str(ptid))
         self._setConfiguration('replicas', str(num_replicas))
+        if dropping and not self._dropping:
+            self._dropping = dropping
+            if app.operational:
+                self._dropPartitions(app)
 
     @requires(_changePartitionTable)
     def updateCellTID(self, partition, tid):
@@ -628,9 +651,70 @@ class DatabaseManager(object):
                 else:
                     yield offset, None
 
+    def _dropPartitions(self, app):
+        if app.disable_drop_partitions:
+            logging.info("don't drop data for partitions %r", self._dropping)
+            return
+        def dropPartitions():
+            dropping = self._dropping
+            before = drop_count, drop_time = self._drop_stats
+            commit = dropped = 0
+            while dropping:
+                offset = next(iter(dropping))
+                log = dropped
+                while True:
+                    yield 1
+                    if offset not in dropping:
+                        break
+                    start = time()
+                    if 0 < commit < start:
+                        self.commit()
+                        logging.debug('drop: committed')
+                        commit = 0
+                        continue
+                    data_id_list = self._dropPartition(offset,
+                        # The efficiency drops when the number of lines to
+                        # delete is too small so do not delete too few.
+                        max(100, int(.1 * drop_count / drop_time))
+                        if drop_time else 1000)
+                    if data_id_list:
+                        if not commit:
+                            commit = time() + 1
+                        if log == dropped:
+                            dropped += 1
+                            logging.info("dropping partition %s...", offset)
+                        if type(data_id_list) is list:
+                            try:
+                                data_id_list.remove(None)
+                                pass # XXX: not covered
+                            except ValueError:
+                                pass
+                            logging.debug('drop: pruneData(%s)',
+                                          len(data_id_list))
+                            drop_count += self._pruneData(data_id_list)
+                            drop_time += time() - start
+                            self._drop_stats = drop_count, drop_time
+                            continue
+                    dropping.remove(offset)
+                    break
+            if dropped:
+                if commit:
+                    self.commit()
+                logging.info("%s partition(s) dropped"
+                    " (stats: count: %s/%s, time: %.4s/%.4s)",
+                    dropped, drop_count - before[0], drop_count,
+                    round(drop_time - before[1], 3), round(drop_time, 3))
+        app.newTask(dropPartitions())
+
     @abstract
-    def dropPartitions(self, offset_list):
-        """Delete all data for specified partitions"""
+    def _dropPartition(self, offset, count):
+        """Delete rows for given partition
+
+        Delete at most 'count' rows of from obj:
+        - if there's no line to delete, purge trans and return
+          a boolean indicating if any row was deleted (from trans)
+        - else return data ids of deleted rows
+        """
 
     def _getUnfinishedDataIdList(self):
         """Drop any unfinished data from a database."""
diff --git a/neo/storage/database/mysqldb.py b/neo/storage/database/mysqldb.py
index 33b533ed..77e69d79 100644
--- a/neo/storage/database/mysqldb.py
+++ b/neo/storage/database/mysqldb.py
@@ -515,19 +515,20 @@ class MySQLDatabaseManager(DatabaseManager):
                   " ON DUPLICATE KEY UPDATE tid = %d"
                   % (offset, nid, tid, tid))
 
-    def dropPartitions(self, offset_list):
+    def _dropPartition(self, offset, count):
         q = self.query
-        # XXX: these queries are inefficient (execution time increase with
-        # row count, although we use indexes) when there are rows to
-        # delete. It should be done as an idle task, by chunks.
-        for partition in offset_list:
-            where = " WHERE `partition`=%d" % partition
-            data_id_list = [x for x, in
-                q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
-                  "%s AND data_id IS NOT NULL" % where)]
+        where = " WHERE `partition`=%s ORDER BY tid, oid LIMIT %s" % (
+            offset, count)
+        logging.debug("drop: select(%s)", count)
+        x = q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)" + where)
+        if x:
+            logging.debug("drop: obj")
             q("DELETE FROM obj" + where)
-            q("DELETE FROM trans" + where)
-            self._pruneData(data_id_list)
+            return [x for x, in x]
+        logging.debug("drop: trans")
+        q("DELETE FROM trans WHERE `partition`=%s" % offset)
+        (x,), = q('SELECT ROW_COUNT()')
+        return x
 
     def _getUnfinishedDataIdList(self):
         return [x for x, in self.query(
diff --git a/neo/storage/database/sqlite.py b/neo/storage/database/sqlite.py
index 1303f2d8..a176c97e 100644
--- a/neo/storage/database/sqlite.py
+++ b/neo/storage/database/sqlite.py
@@ -364,17 +364,14 @@ class SQLiteDatabaseManager(DatabaseManager):
                 q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
                   (offset, nid, int(state)))
 
-    def dropPartitions(self, offset_list):
-        where = " WHERE partition=?"
+    def _dropPartition(self, *args):
         q = self.query
-        for partition in offset_list:
-            args = partition,
-            data_id_list = [x for x, in q(
-                "SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
-                % where, args)]
-            q("DELETE FROM obj" + where, args)
-            q("DELETE FROM trans" + where, args)
-            self._pruneData(data_id_list)
+        where = " FROM obj WHERE partition=? ORDER BY tid, oid LIMIT ?"
+        x = q("SELECT data_id" + where, args).fetchall()
+        if x:
+            q("DELETE" + where, args)
+            return [x for x, in x]
+        return q("DELETE FROM trans WHERE partition=?", args[:1]).rowcount
 
     def _getUnfinishedDataIdList(self):
         return [x for x, in self.query(
diff --git a/neo/storage/handlers/__init__.py b/neo/storage/handlers/__init__.py
index 6c286e36..ddeffb3a 100644
--- a/neo/storage/handlers/__init__.py
+++ b/neo/storage/handlers/__init__.py
@@ -72,7 +72,7 @@ class BaseMasterHandler(BaseHandler):
         if ptid != 1 + app.pt.getID():
             raise ProtocolError('wrong partition table id')
         app.pt.update(ptid, num_replicas, cell_list, app.nm)
-        app.dm.changePartitionTable(ptid, num_replicas, cell_list)
+        app.dm.changePartitionTable(app, ptid, num_replicas, cell_list)
         if app.operational:
             app.replicator.notifyPartitionChanges(cell_list)
         app.dm.commit()
diff --git a/neo/storage/handlers/initialization.py b/neo/storage/handlers/initialization.py
index e61455cd..26461f35 100644
--- a/neo/storage/handlers/initialization.py
+++ b/neo/storage/handlers/initialization.py
@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler):
         pt.load(ptid, num_replicas, row_list, app.nm)
         if not pt.filled():
             raise ProtocolError('Partial partition table received')
-        # Install the partition table into the database for persistence.
-        cell_list = []
-        unassigned = range(pt.getPartitions())
-        for offset in reversed(unassigned):
-            for cell in pt.getCellList(offset):
-                cell_list.append((offset, cell.getUUID(), cell.getState()))
-                if cell.getUUID() == app.uuid:
-                    unassigned.remove(offset)
-        # delete objects database
+        cell_list = [(offset, cell.getUUID(), cell.getState())
+            for offset in xrange(pt.getPartitions())
+            for cell in pt.getCellList(offset)]
         dm = app.dm
-        if unassigned:
-          if app.disable_drop_partitions:
-            logging.info('partitions %r are discarded but actual deletion'
-                         ' of data is disabled', unassigned)
-          else:
-            logging.debug('drop data for partitions %r', unassigned)
-            dm.dropPartitions(unassigned)
-
-        dm.changePartitionTable(ptid, num_replicas, cell_list, reset=True)
+        dm.changePartitionTable(app, ptid, num_replicas, cell_list, reset=True)
         dm.commit()
 
     def truncate(self, conn, tid):
diff --git a/neo/tests/storage/testMasterHandler.py b/neo/tests/storage/testMasterHandler.py
index e273c298..8fa93ce4 100644
--- a/neo/tests/storage/testMasterHandler.py
+++ b/neo/tests/storage/testMasterHandler.py
@@ -90,7 +90,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
         # dm call
         calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(ptid, 1, cells)
+        calls[0].checkArgs(app, ptid, 1, cells)
 
 if __name__ == "__main__":
     unittest.main()
diff --git a/neo/tests/storage/testStorageDBTests.py b/neo/tests/storage/testStorageDBTests.py
index 7cd4903c..d5177551 100644
--- a/neo/tests/storage/testStorageDBTests.py
+++ b/neo/tests/storage/testStorageDBTests.py
@@ -53,7 +53,7 @@ class StorageDBTests(NeoUnitTestBase):
         uuid = self.getStorageUUID()
         db.setUUID(uuid)
         self.assertEqual(uuid, db.getUUID())
-        db.changePartitionTable(1, 0,
+        db.changePartitionTable(None, 1, 0,
             [(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
             reset=True)
         self.assertEqual(num_partitions, 1 + db._getMaxPartition())
diff --git a/neo/tests/threaded/testReplication.py b/neo/tests/threaded/testReplication.py
index 4cd2e0fe..c7fd073c 100644
--- a/neo/tests/threaded/testReplication.py
+++ b/neo/tests/threaded/testReplication.py
@@ -477,13 +477,10 @@ class ReplicationTests(NEOThreadedTest):
             return isinstance(packet, delayed) and \
                    packet._args[0] == offset and \
                    conn in s1.getConnectionList(s0)
-        def changePartitionTable(orig, ptid, num_replicas, cell_list):
+        def changePartitionTable(orig, app, ptid, num_replicas, cell_list):
             if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
                 connection_filter.remove(delayAskFetch)
-                # XXX: this is currently not done by
-                #      default for performance reason
-                orig.im_self.dropPartitions((offset,))
-            return orig(ptid, num_replicas, cell_list)
+            return orig(app, ptid, num_replicas, cell_list)
         np = cluster.num_partitions
         s0, s1, s2 = cluster.storage_list
         for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
@@ -708,17 +705,7 @@ class ReplicationTests(NEOThreadedTest):
         cluster.neoctl.tweakPartitionTable()
         self.tic()
         self.assertEqual(1, s1.sqlCount('obj'))
-        # Deletion should start as soon as the cell is discarded, as a
-        # background task, instead of doing it during initialization.
-        count = s0.sqlCount('obj')
-        s0.stop()
-        cluster.join((s0,))
-        s0.resetNode()
-        s0.start()
-        self.tic()
         self.assertEqual(2, s0.sqlCount('obj'))
-        with self.expectedFailure(): \
-        self.assertEqual(2, count)
 
     @with_cluster(replicas=1)
     def testResumingReplication(self, cluster):
-- 
2.30.9