......@@ -42,7 +42,7 @@ Other changes:
- Proper handling of incoming packets for closed/aborted connections.
- An exception while processing an answer could leave the handler switcher
in the bad state.
- In STOPPING cluster state, really wait for all transactions to be finished.
- Several issues when undoing transactions with conflict resolutions
have been fixed.
- Delayed connection acceptation when the storage node is ready.
......@@ -31,8 +31,9 @@ parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--dedup', action='store_true',
help = 'enable deduplication of data when setting'
' up a new storage node (for RocksDB, check'
parser.add_option('--disable-drop-partitions', action='store_true',
help = 'do not delete data of discarded cells, which is'
' useful for big databases because the current'
......@@ -314,6 +314,7 @@ class ImporterDatabaseManager(DatabaseManager):
def commit(self):
# XXX: This misses commits done internally by self.db (lockTransaction).
self._last_commit = time.time()
def close(self):
......@@ -395,9 +395,20 @@ class DatabaseManager(object):
Identifier of object to retrieve.
tid (int, None)
Exact serial to retrieve.
before_tid (int, None)
Serial to retrieve is the highest existing one strictly below this
Return value:
None: oid doesn't exist at requested tid/before_tid (getObject
takes care of checking if the oid exists at other serial)
6-tuple: Record content.
- record serial (int)
- serial or next record modifying object (int, None)
- compression (boolean-ish, None)
- checksum (binary string, None)
- data (binary string, None)
- data_serial (int, None)
......@@ -418,7 +429,7 @@ class DatabaseManager(object):
......@@ -437,11 +448,19 @@ class DatabaseManager(object):
def _fetchObject(self, oid, tid):
"""Specialized version of _getObject, for replication"""
r = self._getObject(oid, tid)
if r:
return r[:1] + r[2:]
return r[:1] + r[2:] # remove next_serial
def fetchObject(self, oid, tid):
Specialized version of getObject, for replication:
- the oid can only be at an exact serial (parameter 'tid')
- next_serial is not part of the result
- if there's no result for the requested serial,
no need check if oid exists at other serial
u64 = util.u64
r = self._fetchObject(u64(oid), u64(tid))
if r:
......@@ -143,6 +143,10 @@ class MySQLDatabaseManager(DatabaseManager):
except OperationalError as m:
code, m = m.args
# IDEA: Is it safe to retry in case of DISK_FULL ?
# XXX: However, this would another case of failure that would
# be unnoticed by other nodes (ADMIN & MASTER). When
# there are replicas, it may be preferred to not retry.
if self._active or SERVER_GONE_ERROR != code != SERVER_LOST \
or not retry:
raise DatabaseFailure('MySQL error %d: %s\nQuery: %s'
......@@ -140,6 +140,9 @@ class StorageOperationHandler(EventHandler):
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
# These are all low-priority packets, in that we don't want to delay
# answers to clients, so tasks are used to postpone work when we're idle.
def getEventQueue(self):
......@@ -157,6 +160,9 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed):
# Splitting this task would cause useless overhead. However, a
# generator function is expected, hence the following fake yield
# so that iteration stops immediately.
return; yield
......@@ -173,7 +179,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed):
return; yield # same as in askCheckTIDRange
