Commit eaa07e25 authored by Julien Muchembled's avatar Julien Muchembled

storage: defer commit when unlocking a transaction (-> better performance)

Before this change, a storage node did 3 commits per transaction:
- once all data are stored
- when locking the transaction
- when unlocking the transaction

The last one is not important for ACID. In case of a crash, the transaction
is unlocked again (verification phase). By deferring it by 1 second, we
only have 2 commits per transaction during high activity because all pending
changes are merged with the commits caused by other transactions.

This change compensates the extra commit(s) per transaction that were
introduced in commit 7eb7cf1b
("Minimize the amount of work during tpc_finish").
parent 254878a8
...@@ -61,7 +61,6 @@ ...@@ -61,7 +61,6 @@
Storage Storage
- Use libmysqld instead of a stand-alone MySQL server. - Use libmysqld instead of a stand-alone MySQL server.
- It should be possible to defer the commit at the end of finishTransaction.
- Notify master when storage becomes available for clients (LATENCY) - Notify master when storage becomes available for clients (LATENCY)
Currently, storage presence is broadcasted to client nodes too early, as Currently, storage presence is broadcasted to client nodes too early, as
the storage node would refuse them until it has only up-to-date data (not the storage node would refuse them until it has only up-to-date data (not
......
...@@ -24,6 +24,7 @@ from .locking import Lock ...@@ -24,6 +24,7 @@ from .locking import Lock
class EpollEventManager(object): class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
_timeout = None
_trigger_exit = False _trigger_exit = False
def __init__(self): def __init__(self):
...@@ -134,12 +135,13 @@ class EpollEventManager(object): ...@@ -134,12 +135,13 @@ class EpollEventManager(object):
def _poll(self, blocking): def _poll(self, blocking):
if blocking: if blocking:
timeout = None timeout = self._timeout
timeout_object = self
for conn in self.connection_dict.itervalues(): for conn in self.connection_dict.itervalues():
t = conn.getTimeout() t = conn.getTimeout()
if t and (timeout is None or t < timeout): if t and (timeout is None or t < timeout):
timeout = t timeout = t
timeout_conn = conn timeout_object = conn
# Make sure epoll_wait does not return too early, because it has a # Make sure epoll_wait does not return too early, because it has a
# granularity of 1ms and Python 2.7 rounds the timeout towards zero. # granularity of 1ms and Python 2.7 rounds the timeout towards zero.
# See also https://bugs.python.org/issue20452 (fixed in Python 3). # See also https://bugs.python.org/issue20452 (fixed in Python 3).
...@@ -185,8 +187,17 @@ class EpollEventManager(object): ...@@ -185,8 +187,17 @@ class EpollEventManager(object):
if conn.readable(): if conn.readable():
self._addPendingConnection(conn) self._addPendingConnection(conn)
elif blocking > 0: elif blocking > 0:
logging.debug('timeout triggered for %r', timeout_conn) logging.debug('timeout triggered for %r', timeout_object)
timeout_conn.onTimeout() timeout_object.onTimeout()
def onTimeout(self):
on_timeout = self._on_timeout
del self._on_timeout
self._timeout = None
on_timeout()
def setTimeout(self, *args):
self._timeout, self._on_timeout = args
def wakeup(self, exit=False): def wakeup(self, exit=False):
with self._trigger_lock: with self._trigger_lock:
......
...@@ -306,7 +306,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -306,7 +306,7 @@ class ImporterDatabaseManager(DatabaseManager):
getPartitionTable changePartitionTable getPartitionTable changePartitionTable
getUnfinishedTIDDict dropUnfinishedData abortTransaction getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction storeTransaction lockTransaction unlockTransaction
storeData _pruneData storeData _pruneData deferCommit
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
......
...@@ -62,6 +62,7 @@ class DatabaseManager(object): ...@@ -62,6 +62,7 @@ class DatabaseManager(object):
% (engine, self.ENGINES)) % (engine, self.ENGINES))
self._engine = engine self._engine = engine
self._wait = wait self._wait = wait
self._deferred = 0
self._parse(database) self._parse(database)
def __getattr__(self, attr): def __getattr__(self, attr):
...@@ -119,8 +120,35 @@ class DatabaseManager(object): ...@@ -119,8 +120,35 @@ class DatabaseManager(object):
def doOperation(self, app): def doOperation(self, app):
pass pass
def _close(self):
"""Backend-specific code to close the database"""
@requires(_close)
def close(self):
self._deferredCommit()
self._close()
def _commit(self):
"""Backend-specific code to commit the pending changes"""
@requires(_commit)
def commit(self): def commit(self):
pass logging.debug('committing...')
self._commit()
# Instead of cancelling a timeout that would be set to defer a commit,
# we simply use to a boolean so that _deferredCommit() does nothing.
# IOW, epoll may wait wake up for nothing but that should be rare,
# because most immediate commits are usually quickly followed by
# deferred commits.
self._deferred = 0
def deferCommit(self):
self._deferred = 1
return self._deferredCommit
def _deferredCommit(self):
if self._deferred:
self.commit()
@abstract @abstract
def getConfiguration(self, key): def getConfiguration(self, key):
...@@ -513,7 +541,10 @@ class DatabaseManager(object): ...@@ -513,7 +541,10 @@ class DatabaseManager(object):
@abstract @abstract
def lockTransaction(self, tid, ttid): def lockTransaction(self, tid, ttid):
"""Mark voted transaction 'ttid' as committed with given 'tid'""" """Mark voted transaction 'ttid' as committed with given 'tid'
All pending changes are committed just before returning to the caller.
"""
@abstract @abstract
def unlockTransaction(self, tid, ttid): def unlockTransaction(self, tid, ttid):
......
...@@ -68,7 +68,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -68,7 +68,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.user, self.passwd, self.db, self.socket = re.match( self.user, self.passwd, self.db, self.socket = re.match(
'(?:([^:]+)(?::(.*))?@)?([^~./]+)(.+)?$', database).groups() '(?:([^:]+)(?::(.*))?@)?([^~./]+)(.+)?$', database).groups()
def close(self): def _close(self):
self.conn.close() self.conn.close()
def _connect(self): def _connect(self):
...@@ -106,8 +106,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -106,8 +106,7 @@ class MySQLDatabaseManager(DatabaseManager):
% (name, self._max_allowed_packet // 1024)) % (name, self._max_allowed_packet // 1024))
self._max_allowed_packet = int(value) self._max_allowed_packet = int(value)
def commit(self): def _commit(self):
logging.debug('committing...')
self.conn.commit() self.conn.commit()
self._active = 0 self._active = 0
...@@ -575,7 +574,6 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -575,7 +574,6 @@ class MySQLDatabaseManager(DatabaseManager):
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid) q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
q("DELETE FROM ttrans WHERE tid=%d" % tid) q("DELETE FROM ttrans WHERE tid=%d" % tid)
self.releaseData(data_id_list) self.releaseData(data_id_list)
self.commit()
def abortTransaction(self, ttid): def abortTransaction(self, ttid):
ttid = util.u64(ttid) ttid = util.u64(ttid)
......
...@@ -78,15 +78,14 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -78,15 +78,14 @@ class SQLiteDatabaseManager(DatabaseManager):
def _parse(self, database): def _parse(self, database):
self.db = os.path.expanduser(database) self.db = os.path.expanduser(database)
def close(self): def _close(self):
self.conn.close() self.conn.close()
def _connect(self): def _connect(self):
logging.info('connecting to SQLite database %r', self.db) logging.info('connecting to SQLite database %r', self.db)
self.conn = sqlite3.connect(self.db, check_same_thread=False) self.conn = sqlite3.connect(self.db, check_same_thread=False)
def commit(self): def _commit(self):
logging.debug('committing...')
retry_if_locked(self.conn.commit) retry_if_locked(self.conn.commit)
if LOG_QUERIES: if LOG_QUERIES:
...@@ -439,7 +438,6 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -439,7 +438,6 @@ class SQLiteDatabaseManager(DatabaseManager):
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=?", (tid,)) q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=?", (tid,))
q("DELETE FROM ttrans WHERE tid=?", (tid,)) q("DELETE FROM ttrans WHERE tid=?", (tid,))
self.releaseData(data_id_list) self.releaseData(data_id_list)
self.commit()
def abortTransaction(self, ttid): def abortTransaction(self, ttid):
args = util.u64(ttid), args = util.u64(ttid),
......
...@@ -76,6 +76,7 @@ class InitializationHandler(BaseMasterHandler): ...@@ -76,6 +76,7 @@ class InitializationHandler(BaseMasterHandler):
dm = self.app.dm dm = self.app.dm
dm.lockTransaction(tid, ttid) dm.lockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid) dm.unlockTransaction(tid, ttid)
dm.commit()
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
self.app.operational = True self.app.operational = True
......
...@@ -197,7 +197,9 @@ class TransactionManager(object): ...@@ -197,7 +197,9 @@ class TransactionManager(object):
""" """
tid = self._transaction_dict[ttid].getTID() tid = self._transaction_dict[ttid].getTID()
logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid)) logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid))
self._app.dm.unlockTransaction(tid, ttid) dm = self._app.dm
dm.unlockTransaction(tid, ttid)
self._app.em.setTimeout(time() + 1, dm.deferCommit())
self.abort(ttid, even_if_locked=True) self.abort(ttid, even_if_locked=True)
def getLockingTID(self, oid): def getLockingTID(self, oid):
......
...@@ -71,6 +71,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -71,6 +71,7 @@ class StorageTests(NEOFunctionalTest):
db = self.neo.getSQLConnection(db_name) db = self.neo.getSQLConnection(db_name)
# wait for the sql transaction to be commited # wait for the sql transaction to be commited
def callback(last_try): def callback(last_try):
db.commit() # to get a fresh view
# One revision per object and two for the root, before and after # One revision per object and two for the root, before and after
(object_number,), = db.query('SELECT count(*) FROM obj') (object_number,), = db.query('SELECT count(*) FROM obj')
return object_number == OBJECT_NUMBER + 2, object_number return object_number == OBJECT_NUMBER + 2, object_number
......
...@@ -89,6 +89,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -89,6 +89,7 @@ class StorageDBTests(NeoUnitTestBase):
yield yield
if commit: if commit:
self.db.unlockTransaction(tid, ttid) self.db.unlockTransaction(tid, ttid)
self.db.commit()
elif commit is not None: elif commit is not None:
self.db.abortTransaction(ttid) self.db.abortTransaction(ttid)
......
...@@ -76,6 +76,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -76,6 +76,7 @@ class TransactionManagerTests(NeoUnitTestBase):
# no history # no history
self.app.dm = Mock({'getObjectHistory': []}) self.app.dm = Mock({'getObjectHistory': []})
self.app.pt = Mock({'isAssigned': True}) self.app.pt = Mock({'isAssigned': True})
self.app.em = Mock({'setTimeout': None})
self.manager = TransactionManager(self.app) self.manager = TransactionManager(self.app)
self.ltid = None self.ltid = None
......
...@@ -601,6 +601,7 @@ class Test(NEOThreadedTest): ...@@ -601,6 +601,7 @@ class Test(NEOThreadedTest):
t.commit() t.commit()
storage.stop() storage.stop()
cluster.join((storage,)) cluster.join((storage,))
storage.em.onTimeout() # deferred commit
storage.resetNode() storage.resetNode()
storage.start() storage.start()
t.begin() t.begin()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment