Commit 63063af1 authored by Yury Selivanov's avatar Yury Selivanov

Merge 3.5 (asyncio)

parents b4a0d52a b461791b
...@@ -14,13 +14,12 @@ if hasattr(socket, 'AF_UNIX'): ...@@ -14,13 +14,12 @@ if hasattr(socket, 'AF_UNIX'):
from . import coroutines from . import coroutines
from . import compat from . import compat
from . import events from . import events
from . import futures
from . import protocols from . import protocols
from .coroutines import coroutine from .coroutines import coroutine
from .log import logger from .log import logger
_DEFAULT_LIMIT = 2**16 _DEFAULT_LIMIT = 2 ** 16
class IncompleteReadError(EOFError): class IncompleteReadError(EOFError):
...@@ -38,15 +37,13 @@ class IncompleteReadError(EOFError): ...@@ -38,15 +37,13 @@ class IncompleteReadError(EOFError):
class LimitOverrunError(Exception): class LimitOverrunError(Exception):
"""Reached buffer limit while looking for the separator. """Reached the buffer limit while looking for a separator.
Attributes: Attributes:
- message: error message - consumed: total number of to be consumed bytes.
- consumed: total number of bytes that should be consumed
""" """
def __init__(self, message, consumed): def __init__(self, message, consumed):
super().__init__(message) super().__init__(message)
self.message = message
self.consumed = consumed self.consumed = consumed
...@@ -132,7 +129,6 @@ if hasattr(socket, 'AF_UNIX'): ...@@ -132,7 +129,6 @@ if hasattr(socket, 'AF_UNIX'):
writer = StreamWriter(transport, protocol, reader, loop) writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer return reader, writer
@coroutine @coroutine
def start_unix_server(client_connected_cb, path=None, *, def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds): loop=None, limit=_DEFAULT_LIMIT, **kwds):
...@@ -416,8 +412,8 @@ class StreamReader: ...@@ -416,8 +412,8 @@ class StreamReader:
self._wakeup_waiter() self._wakeup_waiter()
if (self._transport is not None and if (self._transport is not None and
not self._paused and not self._paused and
len(self._buffer) > 2*self._limit): len(self._buffer) > 2 * self._limit):
try: try:
self._transport.pause_reading() self._transport.pause_reading()
except NotImplementedError: except NotImplementedError:
...@@ -489,24 +485,24 @@ class StreamReader: ...@@ -489,24 +485,24 @@ class StreamReader:
@coroutine @coroutine
def readuntil(self, separator=b'\n'): def readuntil(self, separator=b'\n'):
"""Read chunk of data from the stream until `separator` is found. """Read data from the stream until ``separator`` is found.
On success, chunk and its separator will be removed from internal buffer
(i.e. consumed). Returned chunk will include separator at the end.
Configured stream limit is used to check result. Limit means maximal On success, the data and separator will be removed from the
length of chunk that can be returned, not counting the separator. internal buffer (consumed). Returned data will include the
separator at the end.
If EOF occurs and complete separator still not found, Configured stream limit is used to check result. Limit sets the
IncompleteReadError(<partial data>, None) will be raised and internal maximal length of data that can be returned, not counting the
buffer becomes empty. This partial data may contain a partial separator. separator.
If chunk cannot be read due to overlimit, LimitOverrunError will be raised If an EOF occurs and the complete separator is still not found,
and data will be left in internal buffer, so it can be read again, in an IncompleteReadError exception will be raised, and the internal
some different way. buffer will be reset. The IncompleteReadError.partial attribute
may contain the separator partially.
If stream was paused, this function will automatically resume it if If the data cannot be read because of over limit, a
needed. LimitOverrunError exception will be raised, and the data
will be left in the internal buffer, so it can be read again.
""" """
seplen = len(separator) seplen = len(separator)
if seplen == 0: if seplen == 0:
...@@ -532,8 +528,8 @@ class StreamReader: ...@@ -532,8 +528,8 @@ class StreamReader:
# performance problems. Even when reading MIME-encoded # performance problems. Even when reading MIME-encoded
# messages :) # messages :)
# `offset` is the number of bytes from the beginning of the buffer where # `offset` is the number of bytes from the beginning of the buffer
# is no occurrence of `separator`. # where there is no occurrence of `separator`.
offset = 0 offset = 0
# Loop until we find `separator` in the buffer, exceed the buffer size, # Loop until we find `separator` in the buffer, exceed the buffer size,
...@@ -547,14 +543,16 @@ class StreamReader: ...@@ -547,14 +543,16 @@ class StreamReader:
isep = self._buffer.find(separator, offset) isep = self._buffer.find(separator, offset)
if isep != -1: if isep != -1:
# `separator` is in the buffer. `isep` will be used later to # `separator` is in the buffer. `isep` will be used later
# retrieve the data. # to retrieve the data.
break break
# see upper comment for explanation. # see upper comment for explanation.
offset = buflen + 1 - seplen offset = buflen + 1 - seplen
if offset > self._limit: if offset > self._limit:
raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset) raise LimitOverrunError(
'Separator is not found, and chunk exceed the limit',
offset)
# Complete message (with full separator) may be present in buffer # Complete message (with full separator) may be present in buffer
# even when EOF flag is set. This may happen when the last chunk # even when EOF flag is set. This may happen when the last chunk
...@@ -569,7 +567,8 @@ class StreamReader: ...@@ -569,7 +567,8 @@ class StreamReader:
yield from self._wait_for_data('readuntil') yield from self._wait_for_data('readuntil')
if isep > self._limit: if isep > self._limit:
raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep) raise LimitOverrunError(
'Separator is found, but chunk is longer than limit', isep)
chunk = self._buffer[:isep + seplen] chunk = self._buffer[:isep + seplen]
del self._buffer[:isep + seplen] del self._buffer[:isep + seplen]
...@@ -591,7 +590,8 @@ class StreamReader: ...@@ -591,7 +590,8 @@ class StreamReader:
received before any byte is read, this function returns empty byte received before any byte is read, this function returns empty byte
object. object.
Returned value is not limited with limit, configured at stream creation. Returned value is not limited with limit, configured at stream
creation.
If stream was paused, this function will automatically resume it if If stream was paused, this function will automatically resume it if
needed. needed.
...@@ -630,13 +630,14 @@ class StreamReader: ...@@ -630,13 +630,14 @@ class StreamReader:
def readexactly(self, n): def readexactly(self, n):
"""Read exactly `n` bytes. """Read exactly `n` bytes.
Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be Raise an IncompleteReadError if EOF is reached before `n` bytes can be
read. The `IncompleteReadError.partial` attribute of the exception will read. The IncompleteReadError.partial attribute of the exception will
contain the partial read bytes. contain the partial read bytes.
if n is zero, return empty bytes object. if n is zero, return empty bytes object.
Returned value is not limited with limit, configured at stream creation. Returned value is not limited with limit, configured at stream
creation.
If stream was paused, this function will automatically resume it if If stream was paused, this function will automatically resume it if
needed. needed.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment