Commit f184a1d9 authored by Kirill Smelkov's avatar Kirill Smelkov

X trace IO in between client and server

parent fc0729b3
......@@ -31,10 +31,14 @@ The ZEO protocol sits on top of a sized message protocol.
The ZEO protocol has client and server variants.
"""
from __future__ import print_function
import logging
import socket
from struct import unpack
import sys
import tempfile
from .compat import asyncio
......@@ -43,6 +47,8 @@ logger = logging.getLogger(__name__)
INET_FAMILIES = socket.AF_INET, socket.AF_INET6
traceio = not ('runzeo' in sys.argv[0]) # trace on client side
print('%s traceio=%s' % (sys.argv[0], traceio))
class Protocol(asyncio.Protocol):
"""asyncio low-level ZEO base interface
......@@ -66,6 +72,18 @@ class Protocol(asyncio.Protocol):
# Handle the first message, the protocol handshake, differently
self.message_received = self.first_message_received
self.tracefile = tempfile.NamedTemporaryFile(bufsize=1*1024*1024, prefix='ZEO', suffix='.iotrace', delete=False, dir='/tmp')
def _traceio(self, txrx, message):
if traceio:
x = message
decode = getattr(self, 'decode', None)
if decode is not None:
try:
x = decode(x)
except:
pass
print('%s %r' % (txrx, x), file=self.tracefile)
def __repr__(self):
return self.name
......@@ -98,6 +116,7 @@ class Protocol(asyncio.Protocol):
if paused:
append(message)
else:
self._traceio('tx', message)
writelines((pack(">I", len(message)), message))
self.write_message = write_message
......@@ -111,6 +130,7 @@ class Protocol(asyncio.Protocol):
append(data)
return
for message in data:
self._traceio('tx', message)
writelines((pack(">I", len(message)), message))
if paused:
append(data)
......@@ -159,6 +179,7 @@ class Protocol(asyncio.Protocol):
else:
self.want = 4
self.getting_size = True
self._traceio('rx', collected)
self.message_received(collected)
except Exception:
logger.exception("data_received %s %s %s",
......@@ -189,10 +210,12 @@ class Protocol(asyncio.Protocol):
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
self._traceio('tx', message)
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
self._traceio('tx', message)
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment