Commit c25e68bc authored by Julien Muchembled's avatar Julien Muchembled

storage: speed up replication by sending bigger network packets

parent 96aeb716
...@@ -305,6 +305,7 @@ class Connection(BaseConnection): ...@@ -305,6 +305,7 @@ class Connection(BaseConnection):
# XXX: rename isPending, hasPendingMessages & pending methods # XXX: rename isPending, hasPendingMessages & pending methods
buffering = False
connecting = True connecting = True
client = False client = False
server = False server = False
...@@ -541,8 +542,16 @@ class Connection(BaseConnection): ...@@ -541,8 +542,16 @@ class Connection(BaseConnection):
def _addPacket(self, packet): def _addPacket(self, packet):
"""Add a packet into the write buffer.""" """Add a packet into the write buffer."""
if self.connector.queue(packet.encode()): if self.connector.queue(packet.encode()):
if packet.nodelay or 65536 < self.connector.queue_size:
assert not self.buffering
# enable polling for writing. # enable polling for writing.
self.em.addWriter(self) self.em.addWriter(self)
else:
self.buffering = True
elif self.buffering and (65536 < self.connector.queue_size
or packet.nodelay):
self.buffering = False
self.em.addWriter(self)
logging.packet(self, packet, True) logging.packet(self, packet, True)
def send(self, packet, msg_id=None): def send(self, packet, msg_id=None):
......
...@@ -72,11 +72,13 @@ class SocketConnector(object): ...@@ -72,11 +72,13 @@ class SocketConnector(object):
# disable Nagle algorithm to reduce latency # disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = [ENCODED_VERSION] self.queued = [ENCODED_VERSION]
self.queue_size = len(ENCODED_VERSION)
return self return self
def queue(self, data): def queue(self, data):
was_empty = not self.queued was_empty = not self.queued
self.queued += data self.queued += data
self.queue_size += len(data)
return was_empty return was_empty
def _error(self, op, exc=None): def _error(self, op, exc=None):
...@@ -183,8 +185,10 @@ class SocketConnector(object): ...@@ -183,8 +185,10 @@ class SocketConnector(object):
# could be sent. # could be sent.
if n != len(msg): if n != len(msg):
self.queued[:] = msg[n:], self.queued[:] = msg[n:],
self.queue_size -= n
return False return False
del self.queued[:] del self.queued[:]
self.queue_size = 0
else: else:
assert not self.queued assert not self.queued
return True return True
......
...@@ -224,6 +224,7 @@ class Packet(object): ...@@ -224,6 +224,7 @@ class Packet(object):
_code = None _code = None
_fmt = None _fmt = None
_id = None _id = None
nodelay = True
poll_thread = False poll_thread = False
def __init__(self, *args): def __init__(self, *args):
...@@ -1441,6 +1442,8 @@ class AddTransaction(Packet): ...@@ -1441,6 +1442,8 @@ class AddTransaction(Packet):
""" """
S -> S S -> S
""" """
nodelay = False
_fmt = PStruct('add_transaction', _fmt = PStruct('add_transaction',
PTID('tid'), PTID('tid'),
PString('user'), PString('user'),
...@@ -1480,6 +1483,8 @@ class AddObject(Packet): ...@@ -1480,6 +1483,8 @@ class AddObject(Packet):
""" """
S -> S S -> S
""" """
nodelay = False
_fmt = PStruct('add_object', _fmt = PStruct('add_object',
POID('oid'), POID('oid'),
PTID('serial'), PTID('serial'),
......
...@@ -254,9 +254,8 @@ class Application(BaseApplication): ...@@ -254,9 +254,8 @@ class Application(BaseApplication):
while task_queue: while task_queue:
try: try:
while isIdle(): while isIdle():
if task_queue[-1].next(): next(task_queue[-1]) or task_queue.rotate()
_poll(0) _poll(0)
task_queue.rotate()
break break
except StopIteration: except StopIteration:
task_queue.pop() task_queue.pop()
...@@ -270,10 +269,6 @@ class Application(BaseApplication): ...@@ -270,10 +269,6 @@ class Application(BaseApplication):
self.replicator.stop() self.replicator.stop()
def newTask(self, iterator): def newTask(self, iterator):
try:
iterator.next()
except StopIteration:
return
self.task_queue.appendleft(iterator) self.task_queue.appendleft(iterator)
def closeClient(self, connection): def closeClient(self, connection):
......
...@@ -388,7 +388,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -388,7 +388,7 @@ class ImporterDatabaseManager(DatabaseManager):
finish() finish()
txn = z.transaction txn = z.transaction
tid = txn.tid tid = txn.tid
yield 1 yield
zodb = z.zodb zodb = z.zodb
for r in z.transaction: for r in z.transaction:
oid = p64(u64(r.oid) + zodb.shift_oid) oid = p64(u64(r.oid) + zodb.shift_oid)
...@@ -413,7 +413,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -413,7 +413,7 @@ class ImporterDatabaseManager(DatabaseManager):
# update 'obj' with 'object_list', some rows in 'data' may be # update 'obj' with 'object_list', some rows in 'data' may be
# unreferenced. This is not a problem because the leak is # unreferenced. This is not a problem because the leak is
# solved when resuming the migration. # solved when resuming the migration.
yield 1 yield
try: try:
z.next() z.next()
except StopIteration: except StopIteration:
......
...@@ -157,7 +157,7 @@ class StorageOperationHandler(EventHandler): ...@@ -157,7 +157,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckTIDRange(*r), msg_id) conn.send(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield return; yield
app.newTask(check()) app.newTask(check())
@checkFeedingConnection(check=True) @checkFeedingConnection(check=True)
...@@ -173,7 +173,7 @@ class StorageOperationHandler(EventHandler): ...@@ -173,7 +173,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckSerialRange(*r), msg_id) conn.send(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield return; yield
app.newTask(check()) app.newTask(check())
@checkFeedingConnection(check=False) @checkFeedingConnection(check=False)
...@@ -209,9 +209,17 @@ class StorageOperationHandler(EventHandler): ...@@ -209,9 +209,17 @@ class StorageOperationHandler(EventHandler):
% partition), msg_id) % partition), msg_id)
return return
oid_list, user, desc, ext, packed, ttid = t oid_list, user, desc, ext, packed, ttid = t
# Sending such packet does not mark the connection
# for writing if there's too little data in the buffer.
conn.send(Packets.AddTransaction(tid, user, conn.send(Packets.AddTransaction(tid, user,
desc, ext, packed, ttid, oid_list), msg_id) desc, ext, packed, ttid, oid_list), msg_id)
yield # To avoid delaying several connections simultaneously,
# and also prevent the backend from scanning different
# parts of the DB at the same time, we ask the
# scheduler not to switch to another background task.
# Ideally, we are filling a buffer while the kernel
# is flushing another one for a concurrent connection.
yield conn.buffering
conn.send(Packets.AnswerFetchTransactions( conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) pack_tid, next_tid, peer_tid_set), msg_id)
yield yield
...@@ -253,9 +261,10 @@ class StorageOperationHandler(EventHandler): ...@@ -253,9 +261,10 @@ class StorageOperationHandler(EventHandler):
"partition %u dropped or truncated" "partition %u dropped or truncated"
% partition), msg_id) % partition), msg_id)
return return
# Same as in askFetchTransactions.
conn.send(Packets.AddObject(oid, serial, *object[2:]), conn.send(Packets.AddObject(oid, serial, *object[2:]),
msg_id) msg_id)
yield yield conn.buffering
conn.send(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id)
yield yield
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment