Commit 3ccead1f authored by Guido van Rossum's avatar Guido van Rossum

asyncio: Refactoring: move write flow control to a subclass/mixin.

parent 63b4d4b4
...@@ -339,7 +339,67 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -339,7 +339,67 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
sock.close() sock.close()
class _SelectorTransport(transports.Transport): class _FlowControlMixin(transports.Transport):
"""All the logic for (write) flow control in a mix-in base class.
The subclass must implement get_write_buffer_size(). It must call
_maybe_pause_protocol() whenever the write buffer size increases,
and _maybe_resume_protocol() whenever it decreases. It may also
override set_write_buffer_limits() (e.g. to specify different
defaults).
The subclass constructor must call super().__init__(extra). This
will call set_write_buffer_limits().
The user may call set_write_buffer_limits() and
get_write_buffer_size(), and their protocol's pause_writing() and
resume_writing() may be called.
"""
def __init__(self, extra=None):
super().__init__(extra)
self._protocol_paused = False
self.set_write_buffer_limits()
def _maybe_pause_protocol(self):
size = self.get_write_buffer_size()
if size <= self._high_water:
return
if not self._protocol_paused:
self._protocol_paused = True
try:
self._protocol.pause_writing()
except Exception:
logger.exception('pause_writing() failed')
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self.get_write_buffer_size() <= self._low_water):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except Exception:
logger.exception('resume_writing() failed')
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
if low is None:
high = 64*1024
else:
high = 4*low
if low is None:
low = high // 4
if not high >= low >= 0:
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
(high, low))
self._high_water = high
self._low_water = low
def get_write_buffer_size(self):
raise NotImplementedError
class _SelectorTransport(_FlowControlMixin, transports.Transport):
max_size = 256 * 1024 # Buffer size passed to recv(). max_size = 256 * 1024 # Buffer size passed to recv().
...@@ -362,8 +422,6 @@ class _SelectorTransport(transports.Transport): ...@@ -362,8 +422,6 @@ class _SelectorTransport(transports.Transport):
self._buffer = self._buffer_factory() self._buffer = self._buffer_factory()
self._conn_lost = 0 # Set when call to connection_lost scheduled. self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called. self._closing = False # Set when close() called.
self._protocol_paused = False
self.set_write_buffer_limits()
if self._server is not None: if self._server is not None:
self._server.attach(self) self._server.attach(self)
...@@ -410,40 +468,6 @@ class _SelectorTransport(transports.Transport): ...@@ -410,40 +468,6 @@ class _SelectorTransport(transports.Transport):
server.detach(self) server.detach(self)
self._server = None self._server = None
def _maybe_pause_protocol(self):
size = self.get_write_buffer_size()
if size <= self._high_water:
return
if not self._protocol_paused:
self._protocol_paused = True
try:
self._protocol.pause_writing()
except Exception:
logger.exception('pause_writing() failed')
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self.get_write_buffer_size() <= self._low_water):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except Exception:
logger.exception('resume_writing() failed')
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
if low is None:
high = 64*1024
else:
high = 4*low
if low is None:
low = high // 4
if not high >= low >= 0:
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
(high, low))
self._high_water = high
self._low_water = low
def get_write_buffer_size(self): def get_write_buffer_size(self):
return len(self._buffer) return len(self._buffer)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment