Commit d4805a0f authored by dieter's avatar dieter Committed by Kirill Smelkov

*: Documentation, Cosmetics

--------
kirr:

Extract from https://github.com/zopefoundation/ZEO/pull/195 bits that
add documentation to existing code without changing semantic, and fix
typos.

The only things I added myself with further help from @d-maurer are:

- documentation for server_sync in ClientStorage;
- stub documentation for credentials in ClientStorage.

Even though we agree to deprecate credentials in favour of peer-to-peer
TLS, removing their support should go as a separate step.

For server_sync feature, that
https://github.com/zopefoundation/ZEO/pull/195 currently removes, we
actually do use it for correctness:

https://lab.nexedi.com/nexedi/erp5/blob/eaae74a082a0/product/ERP5Type/tests/custom_zodb.py#L175-179
nexedi/erp5@c663257f
https://github.com/zopefoundation/ZODB/commit/9821696f584f

So document it with the intent to preserve it.

/reviewed-on https://github.com/zopefoundation/ZEO/pull/202
parent cc84605e
...@@ -120,7 +120,7 @@ Changelog ...@@ -120,7 +120,7 @@ Changelog
- Fixed to work with some changes made in ZODB 5.4.0. - Fixed to work with some changes made in ZODB 5.4.0.
Client-side updates are incuded for ZODB 5.4.0 or databases that Client-side updates are included for ZODB 5.4.0 or databases that
already had ``zodbpickle.binary`` OIDs. See `issue 113 already had ``zodbpickle.binary`` OIDs. See `issue 113
<https://github.com/zopefoundation/ZEO/issues/113>`_. <https://github.com/zopefoundation/ZEO/issues/113>`_.
......
...@@ -112,6 +112,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -112,6 +112,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
This is typically invoked from a custom_zodb.py file. This is typically invoked from a custom_zodb.py file.
All arguments except addr should be keyword arguments. All arguments except addr should be keyword arguments.
Arguments: Arguments:
addr addr
...@@ -122,6 +123,20 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -122,6 +123,20 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
connection. A hostname may be a DNS name or a dotted IP connection. A hostname may be a DNS name or a dotted IP
address. Required. address. Required.
All addresses are assumed to serve (essentially)
the same (potentially replicated) storage.
A connection tries to connect to those addresses;
the first successful connection establishment with
the called for ("read_only" or "writable") capabilities
is selected and used for storage interaction until
the connection is lost. In that case, a
reconnection is tried.
If ``ClientStorage`` calls for the "writable" capability
but allows for a "read only" fallback,,
a read only connection can be used as a fallback;
if a writable connection becomes available later, a
switch to this connection is performed.
storage storage
The server storage name, defaulting to '1'. The name must The server storage name, defaulting to '1'. The name must
match one of the storage names supported by the server(s) match one of the storage names supported by the server(s)
...@@ -136,14 +151,12 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -136,14 +151,12 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
address and the server storage name. This is used to address and the server storage name. This is used to
construct the response to getName() construct the response to getName()
cache
A cache object or a name, relative to the current working
directory, used to construct persistent cache filenames.
Defaults to None, in which case the cache is not
persistent. See ClientCache for more info.
wait_timeout wait_timeout
Maximum time to wait for results, including connecting. Maximum time (seconds) to wait for connections,
defaulting to 30.
Note: the timeout applies only to [re]connect.
Normal operations can take arbitrary long. This
is important for long running operations, such as ``pack``.
read_only read_only
A flag indicating whether this should be a A flag indicating whether this should be a
...@@ -164,7 +177,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -164,7 +177,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
shared_blob_dir shared_blob_dir
Flag whether the blob_dir is a server-shared filesystem Flag whether the blob_dir is a server-shared filesystem
that should be used instead of transferring blob data over that should be used instead of transferring blob data over
ZEO protocol. the ZEO protocol.
blob_cache_size blob_cache_size
Maximum size of the ZEO blob cache, in bytes. If not set, then Maximum size of the ZEO blob cache, in bytes. If not set, then
...@@ -174,7 +187,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -174,7 +187,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
This option is ignored if shared_blob_dir is true. This option is ignored if shared_blob_dir is true.
blob_cache_size_check blob_cache_size_check
ZEO check size as percent of blob_cache_size. The ZEO Cache check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 10% of the blob cache loaded into the cache. Defaults to 10% of the blob cache
size. This option is ignored if shared_blob_dir is true. size. This option is ignored if shared_blob_dir is true.
...@@ -182,10 +195,72 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -182,10 +195,72 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
client_label client_label
A label to include in server log messages for the client. A label to include in server log messages for the client.
cache
A cache object or a file path (relative or absolute).
Defaults to None, in which case the cache is determined
from client and var.
ssl
An ssl client context (i.e. with purpose "ServerAuth")
to call for SSL connections.
ssl_server_hostname
The server hostname - used during the SSL authentication check
client
var
If cache is None, client determines the cache:
if it is None, then a non persistent cache is used;
otherwise, client is used together with var (defaults
to the current working directory) to construct the
file path for the persistent cache file
wait
Wait for server connection, defaulting to true.
credentials
username
password
realm
[ZEO4 only] Credentials for authentication to server.
In ZEO5 support for credentials has been dropped in favor of SSL.
`credentials` support is scheduled to be removed in `ZEO6`.
server_sync
Whether sync() should make a server round trip, thus causing client
to wait for outstanding invalidations.
The `sync` is called in `transaction.begin`. A server round trip
at this place guarantees that the transaction takes notice of all
prior modifications.
This may be important when several client processes share the same
ZODB as the following examples demonstrate.
Example 1: Assume that a user issues requests req1 and then req2
where req1 modifies data read by req2. If req1 and req2 are
processed by different Zope processes, then the transaction started
for req2 processing may see a ZODB state which does not yet include
the req1 modifications. A server round trip avoids this.
Example 2: A similar situation arises when 2 Zope processes
communicate via non ZODB means to inform the other about a state
change. If this communication happens to be faster than the ZODB
internal state change propagation then the target process again
risks to not yet see the changed state. A server round trip, again,
avoids this.
Defaults to false.
disconnect_poll
min_disconnect_poll
max_disconnect_poll
drop_cache_rather_verify
ignored; retained (as parameters) for compatibility
Note that the authentication protocol is defined by the server Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details). testConnection() and doAuth() for details).
""" """
assert not username or password or realm assert not username or password or realm
...@@ -285,7 +360,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -285,7 +360,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
try: try:
self._wait() self._wait()
except Exception: except Exception:
# No point in keeping the server going of the storage # No point in keeping the server going if the storage
# creation fails # creation fails
self._server.close() self._server.close()
raise raise
...@@ -357,6 +432,11 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -357,6 +432,11 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
_connection_generation = 0 _connection_generation = 0
def notify_connected(self, conn, info): def notify_connected(self, conn, info):
"""The connection is about to be established via *conn*.
*info* is a ``dict`` providing information about the server
(and its associated storage).
"""
self.set_server_addr(conn.get_peername()) self.set_server_addr(conn.get_peername())
self.protocol_version = conn.protocol_version self.protocol_version = conn.protocol_version
self._is_read_only = conn.is_read_only() self._is_read_only = conn.is_read_only()
......
...@@ -621,13 +621,17 @@ class ZEOStorage(object): ...@@ -621,13 +621,17 @@ class ZEOStorage(object):
class StorageServerDB(object): class StorageServerDB(object):
"""Adapter from StorageServerDB to ZODB.interfaces.IStorageWrapper """Adapts (StorageServer, storage_id) to ZODB.interfaces.IStorageWrapper.
This is used in a ZEO fan-out situation, where a storage server The class is used as ``DB`` emulation in a ``registerDB`` call;
calls registerDB on a ClientStorage. it allows the storage server to keep its cached data about a storage
up to date and to keep the respective connections informed.
Note that this is called from the Client-storage's IO thread, so In particular, the class is used in a ZEO fan-out situation,
always a separate thread from the storge-server connections. where a storage server calls registerDB on a ClientStorage.
Note that in this case the methods are called from the Client-storage's
IO thread, a separate thread from the storge-server connections.
Thus, the methods need to be thread safe.
""" """
def __init__(self, server, storage_id): def __init__(self, server, storage_id):
......
"""ZEO Protocol.
A ZEO protocol instance can be used as a connection.
It exchanges ``bytes`` messages.
Messages are sent via the methods
``_write`` (send a single message) and
``_writeit`` (send the messages generated by an iterator)
Received messages are reported via callbacks.
Messages are received in the same order as they have been written;
especially, the messages wrote with ``_writeit``
are received as contiguous messages.
The first message transmits the protocol version.
Its callback is ``finish_connect``.
The first byte of the protocol version message identifies
an encoding type; the remaining bytes specify the version.
``finish_connect`` is expected to set up
methods ``encode`` and ``decode`` corresponding to the
encoding type.
Followup messages carry encoded tuples
*msgid*, *async_flag*, *name*, *args*
representing either calls (synchronous or asynchronous) or replies.
Their callback is ``message_received``.
ZEO protocol instances can be used concurrently from coroutines (executed
in the same thread).
They are not thread safe.
The ZEO protocol sits on top of a sized message protocol.
The ZEO protocol has client and server variants.
"""
import logging import logging
import socket import socket
from struct import unpack from struct import unpack
...@@ -137,6 +170,7 @@ class Protocol(asyncio.Protocol): ...@@ -137,6 +170,7 @@ class Protocol(asyncio.Protocol):
self.finish_connect(protocol_version) self.finish_connect(protocol_version)
def call_async(self, method, args): def call_async(self, method, args):
"""call method named *method* asynchronously with *args*."""
self._write(self.encode(0, True, method, args)) self._write(self.encode(0, True, method, args))
def call_async_iter(self, it): def call_async_iter(self, it):
......
"""ZEO client interface implementation.
The client interface implementation is split into two parts:
``ClientRunner`` and ``Client``.
``ClientRunner`` calls ``Client`` methods indirectly via
``loop.call_soon_threadsafe``.
``Client`` does not call ``ClientRunner`` methods; however, it
can call ``ClientStorage`` and ``ClientCache`` methods.
Those methods must be thread safe.
Logically, ``Client`` represents a connection to one ZEO
server. However, initially, it can open connections to serveral
servers and choose one of them depending on availability
and required/provided capabilities (read_only/writable).
A server connection is represented by a ``Protocol`` instance.
The ``asyncio`` loop must be run in a separate thread.
The loop management is the responsibility of ``ClientThread``,
a tiny wrapper arount ``ClientRunner``.
"""
from ZEO.Exceptions import ClientDisconnected, ServerException from ZEO.Exceptions import ClientDisconnected, ServerException
import concurrent.futures import concurrent.futures
import functools import functools
...@@ -55,7 +76,7 @@ def future_generator(func): ...@@ -55,7 +76,7 @@ def future_generator(func):
class Protocol(base.Protocol): class Protocol(base.Protocol):
"""asyncio low-level ZEO client interface """asyncio connection to a single ZEO server.
""" """
# All of the code in this class runs in a single dedicated # All of the code in this class runs in a single dedicated
...@@ -71,7 +92,7 @@ class Protocol(base.Protocol): ...@@ -71,7 +92,7 @@ class Protocol(base.Protocol):
addr, client, storage_key, read_only, connect_poll=1, addr, client, storage_key, read_only, connect_poll=1,
heartbeat_interval=60, ssl=None, ssl_server_hostname=None, heartbeat_interval=60, ssl=None, ssl_server_hostname=None,
credentials=None): credentials=None):
"""Create a client interface """Create a server connection
addr is either a host,port tuple or a string file name. addr is either a host,port tuple or a string file name.
...@@ -165,6 +186,10 @@ class Protocol(base.Protocol): ...@@ -165,6 +186,10 @@ class Protocol(base.Protocol):
@future_generator @future_generator
def finish_connect(self, protocol_version): def finish_connect(self, protocol_version):
"""setup for *protocol_version* and verify the connection."""
# the first byte of ``protocol_version`` specifies the coding type
# the remaining bytes the version proper
# The future implementation we use differs from # The future implementation we use differs from
# asyncio.Future in that callbacks are called immediately, # asyncio.Future in that callbacks are called immediately,
# rather than using the loops call_soon. We want to avoid a # rather than using the loops call_soon. We want to avoid a
...@@ -189,6 +214,8 @@ class Protocol(base.Protocol): ...@@ -189,6 +214,8 @@ class Protocol(base.Protocol):
credentials = (self.credentials,) if self.credentials else () credentials = (self.credentials,) if self.credentials else ()
# We try to register with the server; if this succeeds with
# the client.
try: try:
try: try:
server_tid = yield self.fut( server_tid = yield self.fut(
...@@ -371,6 +398,8 @@ class Client(object): ...@@ -371,6 +398,8 @@ class Client(object):
# None=Never connected # None=Never connected
# True=connected # True=connected
# False=Disconnected # False=Disconnected
# Note: ``True`` indicates only the first phase of readyness;
# it does not mean that we are fully ready.
ready = None ready = None
def __init__(self, loop, def __init__(self, loop,
...@@ -379,7 +408,11 @@ class Client(object): ...@@ -379,7 +408,11 @@ class Client(object):
ssl=None, ssl_server_hostname=None, credentials=None): ssl=None, ssl_server_hostname=None, credentials=None):
"""Create a client interface """Create a client interface
addr is either a host,port tuple or a string file name. *addrs* specifies addresses of a set of servers which
(essentially) serve the same data.
Each address is either a host,port tuple or a string file name.
The object tries to connect to each of them and
chooses the first appropriate one.
client is a ClientStorage. It must be thread safe. client is a ClientStorage. It must be thread safe.
...@@ -503,6 +536,7 @@ class Client(object): ...@@ -503,6 +536,7 @@ class Client(object):
@future_generator @future_generator
def verify(self, server_tid): def verify(self, server_tid):
"""cache verification and invalidation."""
self.verify_invalidation_queue = [] # See comment in init :( self.verify_invalidation_queue = [] # See comment in init :(
protocol = self.protocol protocol = self.protocol
...@@ -581,8 +615,19 @@ class Client(object): ...@@ -581,8 +615,19 @@ class Client(object):
self.register_failed(self, exc) self.register_failed(self, exc)
else: else:
# Note: it is important that we first inform
# ``client`` (actually the ``ClientStorage``)
# that we are (almost) connected
# before we officially announce connectedness:
# the ``notify_connected`` adds information vital
# for storage use; the announcement
# allows waiting threads to use the storage.
# ``notify_connected`` can call our ``call_async``
# but **MUST NOT** use other methods or the normal API
# to interact with the server (deadlock or
# ``ClientDisconnected`` would result).
self.client.notify_connected(self, info) self.client.notify_connected(self, info)
self.connected.set_result(None) self.connected.set_result(None) # signal full readyness
def get_peername(self): def get_peername(self):
return self.protocol.get_peername() return self.protocol.get_peername()
...@@ -811,6 +856,15 @@ class ClientRunner(object): ...@@ -811,6 +856,15 @@ class ClientRunner(object):
raise raise
def call(self, method, *args, **kw): def call(self, method, *args, **kw):
"""call method named *method* with *args*.
Supported keywords:
timeout
wait at most this long for readyness
``None`` is replaced by ``self.timeout`` (usually 30s)
default: ``None``
"""
return self.__call(self.call_threadsafe, method, args, **kw) return self.__call(self.call_threadsafe, method, args, **kw)
def call_future(self, method, *args): def call_future(self, method, *args):
...@@ -821,6 +875,7 @@ class ClientRunner(object): ...@@ -821,6 +875,7 @@ class ClientRunner(object):
return result return result
def async_(self, method, *args): def async_(self, method, *args):
"""call method named *method* with *args* asynchronously."""
return self.__call(self.call_async_threadsafe, method, args) return self.__call(self.call_async_threadsafe, method, args)
def async_iter(self, it): def async_iter(self, it):
...@@ -870,6 +925,7 @@ class ClientRunner(object): ...@@ -870,6 +925,7 @@ class ClientRunner(object):
self.__call(self.apply_threadsafe, self.client.new_addrs, addrs) self.__call(self.apply_threadsafe, self.client.new_addrs, addrs)
def wait(self, timeout=None): def wait(self, timeout=None):
"""wait for readyness"""
if timeout is None: if timeout is None:
timeout = self.timeout timeout = self.timeout
self.wait_for_result(self.client.connected, timeout) self.wait_for_result(self.client.connected, timeout)
...@@ -932,6 +988,13 @@ class ClientThread(ClientRunner): ...@@ -932,6 +988,13 @@ class ClientThread(ClientRunner):
closed = False closed = False
def close(self): def close(self):
"""close the server connection and release resources.
``close`` can be called at any moment; it should not
raise an exception. Calling ``close`` again does
not have an effect. Most other calls will raise
a ``ClientDisconnected`` exception.
"""
if not self.closed: if not self.closed:
self.closed = True self.closed = True
super(ClientThread, self).close() super(ClientThread, self).close()
......
"""ZEO server interface implementation."""
import json import json
import logging import logging
import os import os
...@@ -159,7 +161,7 @@ assert best_protocol_version in ServerProtocol.protocols ...@@ -159,7 +161,7 @@ assert best_protocol_version in ServerProtocol.protocols
def new_connection(loop, addr, socket, zeo_storage, msgpack): def new_connection(loop, addr, socket, zeo_storage, msgpack):
protocol = ServerProtocol(loop, addr, zeo_storage, msgpack) protocol = ServerProtocol(loop, addr, zeo_storage, msgpack)
cr = loop.create_connection((lambda: protocol), sock=socket) cr = loop.create_connection(lambda: protocol, sock=socket)
asyncio.ensure_future(cr, loop=loop) asyncio.ensure_future(cr, loop=loop)
......
...@@ -9,6 +9,12 @@ except NameError: ...@@ -9,6 +9,12 @@ except NameError:
class Loop(object): class Loop(object):
"""Simple loop for testing purposes.
It calls callbacks directly (instead of in the next round);
it remembers ``call_later`` calls rather than schedule them;
it does not check calls to non threadsafe methods.
"""
protocol = transport = None protocol = transport = None
......
...@@ -77,6 +77,12 @@ class Base(object): ...@@ -77,6 +77,12 @@ class Base(object):
class ClientTests(Base, setupstack.TestCase, ClientRunner): class ClientTests(Base, setupstack.TestCase, ClientRunner):
"""Test ``Client``.
The tests emulate a server and its responses to verify
that ``Client`` and ``client.Protocol`` instances behave
as they should.
"""
maxDiff = None maxDiff = None
...@@ -847,7 +853,7 @@ class ServerTests(Base, setupstack.TestCase): ...@@ -847,7 +853,7 @@ class ServerTests(Base, setupstack.TestCase):
protocol = self.connect(True) protocol = self.connect(True)
protocol.zeo_storage.notify_connected.assert_called_once_with(protocol) protocol.zeo_storage.notify_connected.assert_called_once_with(protocol)
# If we try to call a methid that isn't in the protocol's # If we try to call a method that isn't in the protocol's
# white list, it will disconnect: # white list, it will disconnect:
self.assertFalse(protocol.loop.transport.closed) self.assertFalse(protocol.loop.transport.closed)
self.call('foo', target=None) self.call('foo', target=None)
......
...@@ -935,7 +935,7 @@ class ReconnectionTests(CommonSetupTearDown): ...@@ -935,7 +935,7 @@ class ReconnectionTests(CommonSetupTearDown):
# When we start the second server, we use file data file from # When we start the second server, we use file data file from
# the original server so tha the new server is a replica of # the original server so tha the new server is a replica of
# the original. We need this becaise ClientStorage won't use # the original. We need this because ClientStorage won't use
# a server if the server's last transaction is earlier than # a server if the server's last transaction is earlier than
# what the client has seen. # what the client has seen.
self.startServer(index=1, path=self.file+'.0', create=False) self.startServer(index=1, path=self.file+'.0', create=False)
......
...@@ -576,7 +576,7 @@ class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): ...@@ -576,7 +576,7 @@ class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
handler = zope.testing.loggingsupport.InstalledHandler( handler = zope.testing.loggingsupport.InstalledHandler(
'ZEO.asyncio.client') 'ZEO.asyncio.client')
# We no longer implement the event loop, we we no longer know # We no longer implement the event loop, we no longer know
# how to break it. We'll just stop it instead for now. # how to break it. We'll just stop it instead for now.
self._storage._server.loop.call_soon_threadsafe( self._storage._server.loop.call_soon_threadsafe(
self._storage._server.loop.stop) self._storage._server.loop.stop)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment