Commit be839e92 authored by Julien Muchembled's avatar Julien Muchembled

storage: speed up replication by not getting object next_serial for nothing

parent c25e68bc
...@@ -435,6 +435,20 @@ class DatabaseManager(object): ...@@ -435,6 +435,20 @@ class DatabaseManager(object):
compression, checksum, data, compression, checksum, data,
None if data_serial is None else util.p64(data_serial)) None if data_serial is None else util.p64(data_serial))
@fallback
def _fetchObject(self, oid, tid):
r = self._getObject(oid, tid)
if r:
return r[:1] + r[2:]
def fetchObject(self, oid, tid):
u64 = util.u64
r = self._fetchObject(u64(oid), u64(tid))
if r:
serial, compression, checksum, data, data_serial = r
return (util.p64(serial), compression, checksum, data,
None if data_serial is None else util.p64(data_serial))
@contextmanager @contextmanager
def replicated(self, offset): def replicated(self, offset):
readable_set = self._readable_set readable_set = self._readable_set
......
...@@ -698,6 +698,21 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -698,6 +698,21 @@ class MySQLDatabaseManager(DatabaseManager):
if r: if r:
return [(p64(tid), length or 0) for tid, length in r] return [(p64(tid), length or 0) for tid, length in r]
def _fetchObject(self, oid, tid):
r = self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(`partition`)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d AND tid = %d'
% (self._getReadablePartition(oid), oid, tid))
if r:
r = r[0]
compression = r[1]
if compression and compression & 0x80:
return (r[0], compression & 0x7f, r[2],
''.join(self._bigData(data)), r[4])
return r
def getReplicationObjectList(self, min_tid, max_tid, length, partition, def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid): min_oid):
u64 = util.u64 u64 = util.u64
......
...@@ -532,6 +532,17 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -532,6 +532,17 @@ class SQLiteDatabaseManager(DatabaseManager):
self._getPackTID(), offset, length)) self._getPackTID(), offset, length))
] or None ] or None
def _fetchObject(self, oid, tid):
for serial, compression, checksum, data, value_serial in self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON obj.data_id = data.id'
' WHERE partition=? AND oid=? AND tid=?',
(self._getReadablePartition(oid), oid, tid)):
if checksum:
checksum = str(checksum)
data = str(data)
return serial, compression, checksum, data, value_serial
def getReplicationObjectList(self, min_tid, max_tid, length, partition, def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid): min_oid):
u64 = util.u64 u64 = util.u64
......
...@@ -255,15 +255,14 @@ class StorageOperationHandler(EventHandler): ...@@ -255,15 +255,14 @@ class StorageOperationHandler(EventHandler):
if not oid_set: if not oid_set:
del object_dict[serial] del object_dict[serial]
continue continue
object = dm.getObject(oid, serial) object = dm.fetchObject(oid, serial)
if not object: if not object:
conn.send(Errors.ReplicationError( conn.send(Errors.ReplicationError(
"partition %u dropped or truncated" "partition %u dropped or truncated"
% partition), msg_id) % partition), msg_id)
return return
# Same as in askFetchTransactions. # Same as in askFetchTransactions.
conn.send(Packets.AddObject(oid, serial, *object[2:]), conn.send(Packets.AddObject(oid, *object), msg_id)
msg_id)
yield conn.buffering yield conn.buffering
conn.send(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment