Commit 02bb6611 authored by Jim Fulton's avatar Jim Fulton

Merge branch 'ssl' into client-side-conflict-resolution

Conflicts:
	src/ZEO/ClientStorage.py
parents 8b4689d5 48defe9f
...@@ -6,17 +6,6 @@ matrix: ...@@ -6,17 +6,6 @@ matrix:
python: 3.4 python: 3.4
- os: linux - os: linux
python: 3.5 python: 3.5
- os: osx
language: generic
env: TERRYFY_PYTHON='macpython 3.4'
- os: osx
language: generic
env: TERRYFY_PYTHON='homebrew 3'
before_install:
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then git clone https://github.com/MacPython/terryfy; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then source terryfy/travis_tools.sh; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then get_python_environment $TERRYFY_PYTHON venv; fi
- if [[ "$TERRYFY_PYTHON" == "homebrew 3" ]]; then alias pip=`which pip3` ; fi
install: install:
- pip install -U setuptools - pip install -U setuptools
- python bootstrap.py - python bootstrap.py
......
==========================
ZEO HOWTO
==========================
.. contents::
Introduction
============
ZEO (Zope Enterprise Objects) is a client-server system for sharing a
single storage among many clients. When you use ZEO, a lower-level
storage, typically a file storage, is opened in the ZEO server
process. Client programs connect to this process using a ZEO
ClientStorage. ZEO provides a consistent view of the database to all
clients. The ZEO client and server communicate using a custom
protocol layered on top of TCP.
There are several features that affect the behavior of
ZEO. This section describes how a few of these features
work. Subsequent sections describe how to configure every option.
Client cache
------------
Each ZEO client keeps an on-disk cache of recently used data records
to avoid fetching those records from the server each time they are
requested. It is usually faster to read the objects from disk than it
is to fetch them over the network. The cache can also provide
read-only copies of objects during server outages.
The cache may be persistent or transient. If the cache is persistent,
then the cache files are retained for use after process restarts. A
non-persistent cache uses temporary files that are removed when the
client storage is closed.
The client cache size is configured when the ClientStorage is created.
The default size is 20MB, but the right size depends entirely on the
particular database. Setting the cache size too small can hurt
performance, but in most cases making it too big just wastes disk
space.
ZEO uses invalidations for cache consistency. Every time an object is
modified, the server sends a message to each client informing it of
the change. The client will discard the object from its cache when it
receives an invalidation. (It's actually a little more complicated,
but we won't get into that here.)
Each time a client connects to a server, it must verify that its cache
contents are still valid. (It did not receive any invalidation
messages while it was disconnected.) This involves asking the server
to replay invalidations it missed. If it's been disconnected too long,
it discards its cache.
Invalidation queue
------------------
The ZEO server keeps a queue of recent invalidation messages in
memory. When a client connects to the server, it sends the timestamp
of the most recent invalidation message it has received. If that
message is still in the invalidation queue, then the server sends the
client all the missing invalidations.
The default size of the invalidation queue is 100. If the
invalidation queue is larger, it will be more likely that a client
that reconnects will be able to verify its cache using the queue. On
the other hand, a large queue uses more memory on the server to store
the message. Invalidation messages tend to be small, perhaps a few
hundred bytes each on average; it depends on the number of objects
modified by a transaction.
You can also provide an invalidation age when configuring the
server. In this case, if the invalidation queue is too small, but a
client has been disconnected for a time interval that is less than the
invalidation age, then invalidations are replayed by iterating over
the lower-level storage on the server. If the age is too high, and
clients are disconneced for a long time, then this can put a lot of
load on the server.
Transaction timeouts
--------------------
A ZEO server can be configured to timeout a transaction if it takes
too long to complete. Only a single transaction can commit at a time;
so if one transaction takes too long, all other clients will be
delayed waiting for it. In the extreme, a client can hang during the
commit process. If the client hangs, the server will be unable to
commit other transactions until it restarts. A well-behaved client
will not hang, but the server can be configured with a transaction
timeout to guard against bugs that cause a client to hang.
If any transaction exceeds the timeout threshold, the client's
connection to the server will be closed and the transaction aborted.
Once the transaction is aborted, the server can start processing other
client's requests. Most transactions should take very little time to
commit. The timer begins for a transaction after all the data has
been sent to the server. At this point, the cost of commit should be
dominated by the cost of writing data to disk; it should be unusual
for a commit to take longer than 1 second. A transaction timeout of
30 seconds should tolerate heavy load and slow communications between
client and server, while guarding against hung servers.
When a transaction times out, the client can be left in an awkward
position. If the timeout occurs during the second phase of the two
phase commit, the client will log a panic message. This should only
cause problems if the client transaction involved multiple storages.
If it did, it is possible that some storages committed the client
changes and others did not.
Connection management
---------------------
A ZEO client manages its connection to the ZEO server. If it loses
the connection, it attempts to reconnect. While
it is disconnected, it can satisfy some reads by using its cache.
The client can be configured with multiple server addresses. In this
case, it assumes that each server has identical content and will use
any server that is available. It is possible to configure the client
to accept a read-only connection to one of these servers if no
read-write connection is available. If it has a read-only connection,
it will continue to poll for a read-write connection.
If a single address resolves to multiple IPv4 or IPv6 addresses,
the client will connect to an arbitrary of these addresses.
SSL
---
ZEO supports the use of SSL connections between servers and clients,
including certificate authentication. We're still understanding use
cases for this, so details of operation may change.
Installing software
===================
ZEO is installed like any other Python package using pip, buildout, or
pther Python packaging tools.
Running the server
==================
Typically, the ZEO server is run using the ``runzeo`` script that's
installed as part of a ZEO installation. The ``runzeo`` script
accepts command line options, the most important of which is the
``-C`` (``--configuration``) option. ZEO servers are best configured
via configuration files. The ``runzeo`` script also accepts some
command-line arguments for ad-hoc configurations, but there's an
easier way to run an ad-hoc server described below. For more on
configuraing a ZEO server see `Server configuration`_ below.
Server quick-start/ad-hoc operation
-----------------------------------
You can quickly start a ZEO server from a Python prompt::
import ZEO
address, stop = ZEO.server()
This runs a ZEO server on a dynamic address and using an in-memory
storage.
We can then create a ZEO client connection using the address
returned::
connection = ZEO.connection(addr)
This is a ZODB connection for a database opened on a client storage
instance created on the fly. This is a shorthand for::
db = ZEO.DB(addr)
connection = db.open()
Which is a short-hand for::
client_storage = ZEO.client(addr)
import ZODB
db = ZODB.db(client_storage)
connection = db.open()
If you exit the Python process, the storage exits as well, as it's run
in an in-process thread. It will leave behind it's configuration and
log file. This provides a handy way to get a configuration example.
You shut down the server more cleanly by calling the stop function
returned by the ``ZEO.server`` function. This will remove the
temporary configuration file.
To have data stored persistently, you can specify a file-storage path
name using a ``path`` parameter. If you want blob support, you can
specify a blob-file directory using the ``blob_dir`` directory.
You can also supply a port to listen on, full storage configuration
and ZEO server configuration options to the ``ZEO.server``
function. See it's documentation string for more information.
Server configuration
--------------------
The script runzeo.py runs the ZEO server. The server can be
configured using command-line arguments or a config file. This
document only describes the config file. Run runzeo.py
-h to see the list of command-line arguments.
The configuration file specifies the underlying storage the server
uses, the address it binds to, and a few other optional parameters.
An example is::
<zeo>
address zeo.example.com:8090
</zeo>
<filestorage>
path /var/tmp/Data.fs
</filestorage>
<eventlog>
<logfile>
path /var/tmp/zeo.log
format %(asctime)s %(message)s
</logfile>
</eventlog>
The format is similar to the Apache configuration format. Individual
settings have a name, 1 or more spaces and a value, as in::
address zeo.example.com:8090
Settings are grouped into hierarchical sections.
The example above configures a server to use a file storage from
``/var/tmp/Data.fs``. The server listens on port ``8090`` of
``zeo.example.com``. The ZEO server writes its log file to
``/var/tmp/zeo.log`` and uses a custom format for each line. Assuming the
example configuration it stored in ``zeo.config``, you can run a server by
typing::
runzeo -C zeo.config
A configuration file consists of a <zeo> section and a storage
section, where the storage section can use any of the valid ZODB
storage types. It may also contain an eventlog configuration. See
ZODB documentation for information on configuring storages. See
`Configuring event logs <logs.rst>`_ for information on configuring
server logs.
An **easy way to get started** with configuration is to run an add-hoc
server and copy the generated configuration.
The zeo section must list the address. All the other keys are
optional.
address
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
read-only
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
invalidation-queue-size
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to support client cache verification when a client
disconnects for a short period of time.
invalidation-age
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
transaction-timeout
The maximum amount of time, in seconds, to wait for a
transaction to commit after acquiring the storage lock,
specified in seconds. If the transaction takes too long, the
client connection will be closed and the transaction aborted.
This defaults to 30 seconds.
Server SSL configuration
~~~~~~~~~~~~~~~~~~~~~~~~
A server can optionally support SSL. Do do so, include a `ssl`
subsection of the ZEO section, as in::
<zeo>
address zeo.example.com:8090
<ssl>
certificate server_certificate.pem
key server_certificate_key.pem
</ssl>
</zeo>
<filestorage>
path /var/tmp/Data.fs
</filestorage>
<eventlog>
<logfile>
path /var/tmp/zeo.log
format %(asctime)s %(message)s
</logfile>
</eventlog>
The ``ssl`` section has settings:
certificate
The path to an SSL certificate file for the server. (required)
key
The path to the SSL key file for the server certificate (if not
included in certificate file).
password-function
The dotted name if an importable function that, when imported, returns
the password needed to unlock the key (if the key requires a password.)
authenticate
The path to a file or directory containing client certificates
to authenticate. ((See the ``cafile`` and ``capath``
parameters in the Python documentation for
``ssl.SSLContext.load_verify_locations``.)
If this setting is used. then certificate authentication is
used to authenticate clients. A client must be configured
with one of the certificates supplied using this setting.
This option assumes that you're using self-signed certificates.
Running the ZEO server as a daemon
----------------------------------
In an operational setting, you will want to run the ZEO server a
daemon process that is restarted when it dies. The zdaemon package
provides two tools for running daemons: zdrun.py and zdctl.py. You can
find zdaemon and it's documentation at
http://pypi.python.org/pypi/zdaemon.
Note that ``runzeo`` makes no attempt to implemnt a well behaved
daemon. It expects that functionality to be provided by a wrapper like
zdaemon or supervisord.
Rotating log files
------------------
``runzeo`` will re-initialize its logging subsystem when it receives a
SIGUSR2 signal. If you are using the standard event logger, you
should first rename the log file and then send the signal to the
server. The server will continue writing to the renamed log file
until it receives the signal. After it receives the signal, the
server will create a new file with the old name and write to it.
ZEO Clients
===========
To use a ZEO server, you need to connect to it using a ZEO client
storage. You create client storages either using a Python API or
using a ZODB storage configuration in a ZODB storage configuration
section.
Python API for creating a ZEO client storage
--------------------------------------------
To create a client storage from Python, use the ``ZEO.client``
function::
import ZEO
client = ZEO.client(8200)
In the example above, we created a client that connected to a storage
listening on port 8200 on local host. The first argument is an
address, or list of addresses to connect to. There are many additinal
options, decumented below that should be given as keyword arguments.
Addresses can be:
- A host/port tuple
- An integer, which implies that the host is '127.0.0.1'
- A unix domain socket file name.
Options:
cache_size
The cache size in bytes. This defaults to a 20MB.
cache
The ZEO cache to be used. This can be a file name, which will
cause a persisetnt standard persistent ZEO cache to be used and
stored in the given name. This can also be an object that
implements ``ZEO.interfaces.ICache``.
If not specified, then a non-persistent cache will be used.
blob_dir
The name of a directory to hold/cache blob data downloaded from the
server. This must be provided if blobs are to be used. (Of
course, the server storage must be configured to use blobs as
well.)
shared_blob_dir
A client can use a network files system (or a local directory if
the server runs on the same machine) to share a blob directory with
the server. This allows downloading of blobs (except via a
distributed file system) to be avoided.
blob_cache_size
The size of the blob cache in bytes. IF unset, then blobs will
accumulate. If set, then blobs are removed when the total size
exceeds this amount. Blobs accessed least recently are removed
first.
blob_cache_size_check
The total size of data to be downloaded to trigger blob cache size
reduction. The defaukt is 10 (percent). This controls how often to
remove blobs from the cache.
ssl
An ``ssl.SSLContext`` object used to make SSL connections.
ssl_server_hostname
Host name to use for SSL host name checks.
If using SSL and if host name checking is enabled in the given SSL
context then use this as the value to check. If an address is a
host/port pair, then this defaults to the host in the address.
read_only
Set to true for a read-only connection.
If false (the default), then request a read/write connection.
This option is ignored if ``read_only_fallback`` is set to a true value.
read_only_fallback
Set to true, then prefer a read/write connection, but be willing to
use a read-only connection. This defaults to a false value.
If ``read_only_fallback`` is set, then ``read_only`` is ignored.
wait_timeout
How long to wait for an initial connection, defaulting to 30
seconds. If an initial connection can't be made within this time
limit, then creation of the client storage will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
After the initial connection, if the client is disconnected:
- In-flight server requests will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
- New requests will block for up to ``wait_timeout`` waiting for a
connection to be established before failing with a
``ZEO.Exceptions.ClientDisconnected`` exception.
client_label
A short string to display in *server* logs for an event relating to
this client. This can be helpful when debugging.
disconnect_poll
The delay in seconds between attempts to connect to the
server, in seconds. Defaults to 1 second.
Configuration strings/files
---------------------------
ZODB databases and storages can be configured using configuration
files, or strings (extracted from configuration files). They use the
same syntax as the server configuration files described above, but
with different sections and options.
An application that used ZODB might configure it's database using a
string like::
<zodb>
cache-size-bytes 1000MB
<filestorage>
path /var/lib/Data.fs
</filestorage>
</zodb>
In this example, we configured a ZODB database with a object cache
size of 1GB. Inside the database, we configured a file storage. The
``filestorage`` section provided file-storage parameters. We saw a
similar section in the storage-server configuration example in `Server
configuration`_.
To configure a client storage, you use a ``clientstorage`` section,
but first you have to import it's definition, because ZEO isn't built
into ZODB. Here's an example::
<zodb>
cache-size-bytes 1000MB
%import ZEO
<clientstorage>
server 8200
</clientstorage>
</zodb>
In this example, we defined a client storage that connected to a
server on port 8200.
The following settings are supported:
cache-size
The cache size in bytes, KB or MB. This defaults to a 20MB.
Optional ``KB`` or ``MB`` suffixes can (and usually are) used to
specify units other than bytes.
cache-path
The file path of a persistent cache file
blob-dir
The name of a directory to hold/cache blob data downloaded from the
server. This must be provided if blobs are to be used. (Of
course, the server storage must be configured to use blobs as
well.)
shared-blob-dir
A client can use a network files system (or a local directory if
the server runs on the same machine) to share a blob directory with
the server. This allows downloading of blobs (except via a
distributed file system) to be avoided.
blob-cache-size
The size of the blob cache in bytes. IF unset, then blobs will
accumulate. If set, then blobs are removed when the total size
exceeds this amount. Blobs accessed least recently are removed
first.
blob-cache-size-check
The total size of data to be downloaded to trigger blob cache size
reduction. The defaukt is 10 (percent). This controls how often to
remove blobs from the cache.
read-only
Set to true for a read-only connection.
If false (the default), then request a read/write connection.
This option is ignored if ``read_only_fallback`` is set to a true value.
read-only-fallback
Set to true, then prefer a read/write connection, but be willing to
use a read-only connection. This defaults to a false value.
If ``read_only_fallback`` is set, then ``read_only`` is ignored.
wait_timeout
How long to wait for an initial connection, defaulting to 30
seconds. If an initial connection can't be made within this time
limit, then creation of the client storage will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
After the initial connection, if the client is disconnected:
- In-flight server requests will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
- New requests will block for up to ``wait_timeout`` waiting for a
connection to be established before failing with a
``ZEO.Exceptions.ClientDisconnected`` exception.
client_label
A short string to display in *server* logs for an event relating to
this client. This can be helpful when debugging.
disconnect_poll
The delay in seconds between attempts to connect to the
server, in seconds. Defaults to 1 second.
Client SSL configuration
~~~~~~~~~~~~~~~~~~~~~~~~
An ``ssl`` subsection can be used to enable and configure SSL, as in::
%import ZEO
<clientstorage>
server zeo.example.com8200
<ssl>
</ssl>
</clientstorage>
In the example above, SSL is enabled in it's simplest form:
- The cient expects the server to have a signed certificate, which the
client validates.
- The server server host name ``zeo.example.com`` is checked against
the server's certificate.
A number of settings can be provided to configure SSL:
certificate
The path to an SSL certificate file for the client. This is
needed to allow the server to authenticate the client.
key
The path to the SSL key file for the client certificate (if not
included in the certificate file).
password-function
A dotted name if an importable function that, when imported, returns
the password needed to unlock the key (if the key requires a password.)
authenticate
The path to a file or directory containing server certificates
to authenticate. ((See the ``cafile`` and ``capath``
parameters in the Python documentation for
``ssl.SSLContext.load_verify_locations``.)
If this setting is used. then certificate authentication is
used to authenticate the server. The server must be configuted
with one of the certificates supplied using this setting.
check-hostname
This is a boolean setting that defaults to true. Verify the
host name in the server certificate is as expected.
server-hostname
The expected server host name. This defaults to the host name
used in the server address. This option must be used when
``check-hostname`` is true and when a server address has no host
name (localhost, or unix domain socket) or when there is more
than one seerver and server hostnames differ.
Using this setting implies a true value for the ``check-hostname`` setting.
==========================
Running a ZEO Server HOWTO
==========================
Introduction
------------
ZEO (Zope Enterprise Objects) is a client-server system for sharing a
single storage among many clients. Normally, a ZODB storage can only
be used by a single process. When you use ZEO, the storage is opened
in the ZEO server process. Client programs connect to this process
using a ZEO ClientStorage. ZEO provides a consistent view of the
database to all clients. The ZEO client and server communicate using
a custom RPC protocol layered on top of TCP.
There are several configuration options that affect the behavior of a
ZEO server. This section describes how a few of these features
working. Subsequent sections describe how to configure every option.
Client cache
~~~~~~~~~~~~
Each ZEO client keeps an on-disk cache of recently used objects to
avoid fetching those objects from the server each time they are
requested. It is usually faster to read the objects from disk than it
is to fetch them over the network. The cache can also provide
read-only copies of objects during server outages.
The cache may be persistent or transient. If the cache is persistent,
then the cache files are retained for use after process restarts. A
non-persistent cache uses temporary files that are removed when the
client storage is closed.
The client cache size is configured when the ClientStorage is created.
The default size is 20MB, but the right size depends entirely on the
particular database. Setting the cache size too small can hurt
performance, but in most cases making it too big just wastes disk
space. The document "Client cache tracing" describes how to collect a
cache trace that can be used to determine a good cache size.
ZEO uses invalidations for cache consistency. Every time an object is
modified, the server sends a message to each client informing it of
the change. The client will discard the object from its cache when it
receives an invalidation. These invalidations are often batched.
Each time a client connects to a server, it must verify that its cache
contents are still valid. (It did not receive any invalidation
messages while it was disconnected.) There are several mechanisms
used to perform cache verification. In the worst case, the client
sends the server a list of all objects in its cache along with their
timestamps; the server sends back an invalidation message for each
stale object. The cost of verification is one drawback to making the
cache too large.
Note that every time a client crashes or disconnects, it must verify
its cache. Every time a server crashes, all of its clients must
verify their caches.
The cache verification process is optimized in two ways to eliminate
costs when restarting clients and servers. Each client keeps the
timestamp of the last invalidation message it has seen. When it
connects to the server, it checks to see if any invalidation messages
were sent after that timestamp. If not, then the cache is up-to-date
and no further verification occurs. The other optimization is the
invalidation queue, described below.
Invalidation queue
~~~~~~~~~~~~~~~~~~
The ZEO server keeps a queue of recent invalidation messages in
memory. When a client connects to the server, it sends the timestamp
of the most recent invalidation message it has received. If that
message is still in the invalidation queue, then the server sends the
client all the missing invalidations. This is often cheaper than
perform full cache verification.
The default size of the invalidation queue is 100. If the
invalidation queue is larger, it will be more likely that a client
that reconnects will be able to verify its cache using the queue. On
the other hand, a large queue uses more memory on the server to store
the message. Invalidation messages tend to be small, perhaps a few
hundred bytes each on average; it depends on the number of objects
modified by a transaction.
Transaction timeouts
~~~~~~~~~~~~~~~~~~~~
A ZEO server can be configured to timeout a transaction if it takes
too long to complete. Only a single transaction can commit at a time;
so if one transaction takes too long, all other clients will be
delayed waiting for it. In the extreme, a client can hang during the
commit process. If the client hangs, the server will be unable to
commit other transactions until it restarts. A well-behaved client
will not hang, but the server can be configured with a transaction
timeout to guard against bugs that cause a client to hang.
If any transaction exceeds the timeout threshold, the client's
connection to the server will be closed and the transaction aborted.
Once the transaction is aborted, the server can start processing other
client's requests. Most transactions should take very little time to
commit. The timer begins for a transaction after all the data has
been sent to the server. At this point, the cost of commit should be
dominated by the cost of writing data to disk; it should be unusual
for a commit to take longer than 1 second. A transaction timeout of
30 seconds should tolerate heavy load and slow communications between
client and server, while guarding against hung servers.
When a transaction times out, the client can be left in an awkward
position. If the timeout occurs during the second phase of the two
phase commit, the client will log a panic message. This should only
cause problems if the client transaction involved multiple storages.
If it did, it is possible that some storages committed the client
changes and others did not.
Connection management
~~~~~~~~~~~~~~~~~~~~~
A ZEO client manages its connection to the ZEO server. If it loses
the connection, it attempts to reconnect. While
it is disconnected, it can satisfy some reads by using its cache.
The client can be configured to wait for a connection when it is created
or to return immediately and provide data from its persistent cache.
It usually simplifies programming to have the client wait for a
connection on startup.
When the client is disconnected, it polls periodically to see if the
server is available. The rate at which it polls is configurable.
The client can be configured with multiple server addresses. In this
case, it assumes that each server has identical content and will use
any server that is available. It is possible to configure the client
to accept a read-only connection to one of these servers if no
read-write connection is available. If it has a read-only connection,
it will continue to poll for a read-write connection. This feature
supports the Zope Replication Services product,
http://www.zope.com/Products/ZopeProducts/ZRS. In general, it could
be used to with a system that arranges to provide hot backups of
servers in the case of failure.
If a single address resolves to multiple IPv4 or IPv6 addresses,
the client will connect to an arbitrary of these addresses.
Authentication
~~~~~~~~~~~~~~
ZEO supports optional authentication of client and server using a
password scheme similar to HTTP digest authentication (RFC 2069). It
is a simple challenge-response protocol that does not send passwords
in the clear, but does not offer strong security. The RFC discusses
many of the limitations of this kind of protocol. Note that this
feature provides authentication only. It does not provide encryption
or confidentiality.
The challenge-response also produces a session key that is used to
generate message authentication codes for each ZEO message. This
should prevent session hijacking.
Guard the password database as if it contained plaintext passwords.
It stores the hash of a username and password. This does not expose
the plaintext password, but it is sensitive nonetheless. An attacker
with the hash can impersonate the real user. This is a limitation of
the simple digest scheme.
The authentication framework allows third-party developers to provide
new authentication modules.
Installing software
-------------------
ZEO is distributed as part of the ZODB3 package and with Zope,
starting with Zope 2.7. You can download it from
http://pypi.python.org/pypi/ZODB3.
Configuring server
------------------
The script runzeo.py runs the ZEO server. The server can be
configured using command-line arguments or a config file. This
document only describes the config file. Run runzeo.py
-h to see the list of command-line arguments.
The runzeo.py script imports the ZEO package. ZEO must either be
installed in Python's site-packages directory or be in a directory on
PYTHONPATH.
The configuration file specifies the underlying storage the server
uses, the address it binds, and a few other optional parameters.
An example is::
<zeo>
address zeo.example.com:8090
monitor-address zeo.example.com:8091
</zeo>
<filestorage 1>
path /var/tmp/Data.fs
</filestorage>
<eventlog>
<logfile>
path /var/tmp/zeo.log
format %(asctime)s %(message)s
</logfile>
</eventlog>
This file configures a server to use a FileStorage from
/var/tmp/Data.fs. The server listens on port 8090 of zeo.example.com.
It also starts a monitor server that lists in port 8091. The ZEO
server writes its log file to /var/tmp/zeo.log and uses a custom
format for each line. Assuming the example configuration it stored in
zeo.config, you can run a server by typing::
python /usr/local/bin/runzeo.py -C zeo.config
A configuration file consists of a <zeo> section and a storage
section, where the storage section can use any of the valid ZODB
storage types. It may also contain an eventlog configuration. See
the document "Configuring a ZODB database" for more information about
configuring storages and eventlogs.
The zeo section must list the address. All the other keys are
optional.
address
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
read-only
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
invalidation-queue-size
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to speed client cache verification when a client
disconnects for a short period of time.
monitor-address
The address at which the monitor server should listen. If
specified, a monitor server is started. The monitor server
provides server statistics in a simple text format. This can
be in the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
transaction-timeout
The maximum amount of time to wait for a transaction to commit
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
authentication-protocol
The name of the protocol used for authentication. The
only protocol provided with ZEO is "digest," but extensions
may provide other protocols.
authentication-database
The path of the database containing authentication credentials.
authentication-realm
The authentication realm of the server. Some authentication
schemes use a realm to identify the logic set of usernames
that are accepted by this server.
Configuring clients
-------------------
The ZEO client can also be configured using ZConfig. The ZODB.config
module provides several function for opening a storage based on its
configuration.
- ZODB.config.storageFromString()
- ZODB.config.storageFromFile()
- ZODB.config.storageFromURL()
The ZEO client configuration requires the server address be
specified. Everything else is optional. An example configuration is::
<zeoclient>
server zeo.example.com:8090
</zeoclient>
The other configuration options are listed below.
storage
The name of the storage that the client wants to use. If the
ZEO server serves more than one storage, the client selects
the storage it wants to use by name. The default name is '1',
which is also the default name for the ZEO server.
cache-size
The maximum size of the client cache, in bytes.
name
The storage name. If unspecified, the address of the server
will be used as the name.
client
Enables persistent cache files. The string passed here is
used to construct the cache filenames. If it is not
specified, the client creates a temporary cache that will
only be used by the current object.
var
The directory where persistent cache files are stored. By
default cache files, if they are persistent, are stored in
the current directory.
min-disconnect-poll
The minimum delay in seconds between attempts to connect to
the server, in seconds. Defaults to 5 seconds.
max-disconnect-poll
The maximum delay in seconds between attempts to connect to
the server, in seconds. Defaults to 300 seconds.
wait
A boolean indicating whether the constructor should wait
for the client to connect to the server and verify the cache
before returning. The default is true.
read-only
A flag indicating whether this should be a read-only storage,
defaulting to false (i.e. writing is allowed by default).
read-only-fallback
A flag indicating whether a read-only remote storage should be
acceptable as a fallback when no writable storages are
available. Defaults to false. At most one of read_only and
read_only_fallback should be true.
realm
The authentication realm of the server. Some authentication
schemes use a realm to identify the logic set of usernames
that are accepted by this server.
A ZEO client can also be created by calling the ClientStorage
constructor explicitly. For example::
from ZEO.ClientStorage import ClientStorage
storage = ClientStorage(("zeo.example.com", 8090))
Running the ZEO server as a daemon
----------------------------------
In an operational setting, you will want to run the ZEO server a
daemon process that is restarted when it dies. The zdaemon package
provides two tools for running daemons: zdrun.py and zdctl.py. You can
find zdaemon and it's documentation at
http://pypi.python.org/pypi/zdaemon.
Rotating log files
~~~~~~~~~~~~~~~~~~
ZEO will re-initialize its logging subsystem when it receives a
SIGUSR2 signal. If you are using the standard event logger, you
should first rename the log file and then send the signal to the
server. The server will continue writing to the renamed log file
until it receives the signal. After it receives the signal, the
server will create a new file with the old name and write to it.
Tools
-----
There are a few scripts that may help running a ZEO server. The
zeopack.py script connects to a server and packs the storage. It can
be run as a cron job. The zeoup.py script attempts to connect to a
ZEO server and verify that is is functioning. The zeopasswd.py script
manages a ZEO servers password database.
Diagnosing problems
-------------------
If an exception occurs on the server, the server will log a traceback
and send an exception to the client. The traceback on the client will
show a ZEO protocol library as the source of the error. If you need
to diagnose the problem, you will have to look in the server log for
the rest of the traceback.
...@@ -92,7 +92,7 @@ def alltests(): ...@@ -92,7 +92,7 @@ def alltests():
_unittests_only(suite, mod.test_suite()) _unittests_only(suite, mod.test_suite())
return suite return suite
tests_require = ['zope.testing', 'manuel', 'random2'] tests_require = ['zope.testing', 'manuel', 'random2', 'mock']
long_description = ( long_description = (
open('README.rst').read() open('README.rst').read()
......
...@@ -93,6 +93,7 @@ class ClientStorage(object): ...@@ -93,6 +93,7 @@ class ClientStorage(object):
blob_cache_size=None, blob_cache_size_check=10, blob_cache_size=None, blob_cache_size_check=10,
client_label=None, client_label=None,
cache=None, cache=None,
ssl = None, ssl_server_hostname=None,
# Mostly ignored backward-compatability options # Mostly ignored backward-compatability options
client=None, var=None, client=None, var=None,
min_disconnect_poll=1, max_disconnect_poll=None, min_disconnect_poll=1, max_disconnect_poll=None,
...@@ -137,10 +138,6 @@ class ClientStorage(object): ...@@ -137,10 +138,6 @@ class ClientStorage(object):
Defaults to None, in which case the cache is not Defaults to None, in which case the cache is not
persistent. See ClientCache for more info. persistent. See ClientCache for more info.
disconnect_poll
The delay in seconds between attempts to connect to the
server, in seconds. Defaults to 1 second.
wait_timeout wait_timeout
Maximum time to wait for results, including connecting. Maximum time to wait for results, including connecting.
...@@ -264,6 +261,7 @@ class ClientStorage(object): ...@@ -264,6 +261,7 @@ class ClientStorage(object):
addr, self, cache, storage, addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only, ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30, wait_timeout or 30,
ssl = ssl, ssl_server_hostname=ssl_server_hostname,
) )
self._call = self._server.call self._call = self._server.call
self._async = self._server.async self._async = self._server.async
...@@ -738,6 +736,11 @@ class ClientStorage(object): ...@@ -738,6 +736,11 @@ class ClientStorage(object):
# cache entry is bad and invalidate it. # cache entry is bad and invalidate it.
self._cache.invalidate(oid, None) self._cache.invalidate(oid, None)
raise raise
except POSException.StorageTransactionError:
# Hm, we got disconnected and reconnected bwtween
# _check_trans and voting. Let's chack the transaction again:
tbuf = self._check_trans(txn, 'tpc_vote')
raise
if tbuf.exception: if tbuf.exception:
raise tbuf.exception raise tbuf.exception
......
...@@ -31,7 +31,6 @@ import threading ...@@ -31,7 +31,6 @@ import threading
import time import time
import transaction import transaction
import warnings import warnings
import ZEO.acceptor
import ZEO.asyncio.server import ZEO.asyncio.server
import ZODB.blob import ZODB.blob
import ZODB.event import ZODB.event
...@@ -51,6 +50,8 @@ from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError ...@@ -51,6 +50,8 @@ from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import oid_repr, p64, u64, z64 from ZODB.utils import oid_repr, p64, u64, z64
from .asyncio.server import Acceptor
logger = logging.getLogger('ZEO.StorageServer') logger = logging.getLogger('ZEO.StorageServer')
def log(message, level=logging.INFO, label='', exc_info=False): def log(message, level=logging.INFO, label='', exc_info=False):
...@@ -695,6 +696,7 @@ class StorageServer: ...@@ -695,6 +696,7 @@ class StorageServer:
invalidation_queue_size=100, invalidation_queue_size=100,
invalidation_age=None, invalidation_age=None,
transaction_timeout=None, transaction_timeout=None,
ssl=None,
): ):
"""StorageServer constructor. """StorageServer constructor.
...@@ -767,7 +769,7 @@ class StorageServer: ...@@ -767,7 +769,7 @@ class StorageServer:
storage.registerDB(StorageServerDB(self, name)) storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]} self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.acceptor = ZEO.acceptor.Acceptor(addr, self.new_connection) self.acceptor = Acceptor(self, addr, ssl)
if isinstance(addr, tuple) and addr[0]: if isinstance(addr, tuple) and addr[0]:
self.addr = self.acceptor.addr self.addr = self.acceptor.addr
else: else:
...@@ -789,6 +791,9 @@ class StorageServer: ...@@ -789,6 +791,9 @@ class StorageServer:
timeout.start() timeout.start()
self.timeouts[name] = timeout self.timeouts[name] = timeout
def create_client_handler(self):
return ZEOStorage(self, self.read_only)
def _setup_invq(self, name, storage): def _setup_invq(self, name, storage):
lastInvalidations = getattr(storage, 'lastInvalidations', None) lastInvalidations = getattr(storage, 'lastInvalidations', None)
if lastInvalidations is None: if lastInvalidations is None:
...@@ -803,23 +808,6 @@ class StorageServer: ...@@ -803,23 +808,6 @@ class StorageServer:
self.invq[name] = list(lastInvalidations(self.invq_bound)) self.invq[name] = list(lastInvalidations(self.invq_bound))
self.invq[name].reverse() self.invq[name].reverse()
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
"""
logger.debug("new connection %s" % (addr,))
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ZEO.asyncio.server.new_connection(
loop, addr, sock, ZEOStorage(self, self.read_only))
loop.run_forever()
loop.close()
thread = threading.Thread(target=run, name='zeo_client_hander')
thread.setDaemon(True)
thread.start()
def register_connection(self, storage_id, zeo_storage): def register_connection(self, storage_id, zeo_storage):
"""Internal: register a ZEOStorage with a particular storage. """Internal: register a ZEOStorage with a particular storage.
...@@ -1321,3 +1309,7 @@ class Serving(ServerEvent): ...@@ -1321,3 +1309,7 @@ class Serving(ServerEvent):
class Closed(ServerEvent): class Closed(ServerEvent):
pass pass
default_cert_authenticate = 'SIGNED'
def ssl_config(section):
from .sslconfig import ssl_config
return ssl_config(section, True)
...@@ -43,7 +43,7 @@ def connection(*args, **kw): ...@@ -43,7 +43,7 @@ def connection(*args, **kw):
ra ra
def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None, def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
port=0, **kw): port=0, threaded=True, **kw):
"""Convenience function to start a server for interactive exploration """Convenience function to start a server for interactive exploration
This fuction starts a ZEO server, given a storage configuration or This fuction starts a ZEO server, given a storage configuration or
...@@ -78,7 +78,7 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None, ...@@ -78,7 +78,7 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
port port
If no ZEO configuration is supplied, the one will be computed If no ZEO configuration is supplied, the one will be computed
from the port. If no port is supplied, one will be chosedn from the port. If no port is supplied, one will be chosedn
randomly. dynamically.
""" """
import os, ZEO.tests.forker import os, ZEO.tests.forker
...@@ -87,4 +87,4 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None, ...@@ -87,4 +87,4 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
return ZEO.tests.forker.start_zeo_server( return ZEO.tests.forker.start_zeo_server(
storage_conf, zeo_conf, port, keep=True, path=path, storage_conf, zeo_conf, port, keep=True, path=path,
blob_dir=blob_dir, suicide=False, threaded=True, **kw) blob_dir=blob_dir, suicide=False, threaded=threaded, **kw)
================================
asyncio-based networking for ZEO
================================
This package provides the networking interface for ZEO. It provides a
somewhat RPC-like API.
Notes
=====
Sending data immediately: ayncio vs asyncore
--------------------------------------------
The previous ZEO networking implementation used the ``asyncore`` library.
When writing with asyncore, writes were done only from the event loop.
This meant that when sending data, code would have to "wake up" the
event loop, typically after adding data to some sort of output buffer.
Asyncio takes an entirely different and saner approach. When an
application wants to send data, it writes to a transport. All
interactions with a transport (in a correct application) are from the
same thread, which is also the thread running any event loop.
Transports are always either idle or sending data. When idle, the
transport writes to the outout socket immediately. If not all data
isn't sent, then it buffers it and becomes sending. If a transport is
sending, then we know that the socket isn't ready for more data, so
``write`` can just buffer the data. There's no point in waking up the
event loop, because the socket will do so when it's ready for more
data.
An exception to the paragraph above occurs when operations cross
threads, as occures for most client operations and when a transaction
commits on the server and results have to be sent to other clients. In
these cases, a call_soon_threadsafe method is used which queues an
operation and has to wake up an event loop to process it.
Server threading
----------------
There are currently two server implementations, an implementation that
used a thread per client (and a thread to listen for connections),
``ZEO.asyncio.mtacceptor.Acceptor``, and an implementation that uses a
single networking thread, ``ZEO.asyncio.server.Acceptor``. The
implementation is selected by changing an import in
``ZEO.StorageServer``. The currently-used implementation is
``ZEO.asyncio.server.Acceptor``, although this sentance is likely to
rot, so check the import to be sure. (Maybe this should be configurable.)
ZEO switched to a multi-threaded implementation several years ago
because it was found to improve performance for large databases using
magnetic disks. Because client threads are always working on behalf of
a single client, there's not really an issue with making blocking
calls, such as executing slow I/O operations.
Initially, the asyncio-based implementation used a multi-threaded
server. A simple thread accepted connections and handed accepted
sockets to ``create_connection``. This became a problem when SSL was
added because ``create_connection`` sets up SSL conections as client
connections, and doesn't provide an option to create server
connections.
In response, I created an ``asyncio.Server``-based implementation.
This required using a single thread. This was a pretty trivial
change, however, it led to the tests becoming unstable to the point
that it was impossible to run all tests without some failing. One
test was broken due to a ``asyncio.Server`` `bug
<http://bugs.python.org/issue27386>`_. It's unclear whether the test
instability is due to ``asyncio.Server`` problems or due to latent
test (or ZEO) bugs, but even after beating the tests mostly into
submission, tests failures are more likely when using
``asyncio.Server``. Beatings will continue.
While fighting test failures using ``asyncio.Server``, the
multi-threaded implementation was updated to use a monkey patch to
allow it to create SSL server connections. Aside from the real risk of a
monkey patch, this works very well.
Both implementations seem to perform about the same.
...@@ -36,7 +36,7 @@ class Protocol(base.Protocol): ...@@ -36,7 +36,7 @@ class Protocol(base.Protocol):
def __init__(self, loop, def __init__(self, loop,
addr, client, storage_key, read_only, connect_poll=1, addr, client, storage_key, read_only, connect_poll=1,
heartbeat_interval=60): heartbeat_interval=60, ssl=None, ssl_server_hostname=None):
"""Create a client interface """Create a client interface
addr is either a host,port tuple or a string file name. addr is either a host,port tuple or a string file name.
...@@ -54,6 +54,8 @@ class Protocol(base.Protocol): ...@@ -54,6 +54,8 @@ class Protocol(base.Protocol):
self.connect_poll = connect_poll self.connect_poll = connect_poll
self.heartbeat_interval = heartbeat_interval self.heartbeat_interval = heartbeat_interval
self.futures = {} # { message_id -> future } self.futures = {} # { message_id -> future }
self.ssl = ssl
self.ssl_server_hostname = ssl_server_hostname
self.connect() self.connect()
...@@ -81,10 +83,11 @@ class Protocol(base.Protocol): ...@@ -81,10 +83,11 @@ class Protocol(base.Protocol):
if isinstance(self.addr, tuple): if isinstance(self.addr, tuple):
host, port = self.addr host, port = self.addr
cr = self.loop.create_connection( cr = self.loop.create_connection(
self.protocol_factory, host or 'localhost', port) self.protocol_factory, host or 'localhost', port,
ssl=self.ssl, server_hostname=self.ssl_server_hostname)
else: else:
cr = self.loop.create_unix_connection( cr = self.loop.create_unix_connection(
self.protocol_factory, self.addr) self.protocol_factory, self.addr, ssl=self.ssl)
self._connecting = cr = asyncio.async(cr, loop=self.loop) self._connecting = cr = asyncio.async(cr, loop=self.loop)
...@@ -265,7 +268,8 @@ class Client: ...@@ -265,7 +268,8 @@ class Client:
def __init__(self, loop, def __init__(self, loop,
addrs, client, cache, storage_key, read_only, connect_poll, addrs, client, cache, storage_key, read_only, connect_poll,
register_failed_poll=9): register_failed_poll=9,
ssl=None, ssl_server_hostname=None):
"""Create a client interface """Create a client interface
addr is either a host,port tuple or a string file name. addr is either a host,port tuple or a string file name.
...@@ -281,6 +285,8 @@ class Client: ...@@ -281,6 +285,8 @@ class Client:
self.connect_poll = connect_poll self.connect_poll = connect_poll
self.register_failed_poll = register_failed_poll self.register_failed_poll = register_failed_poll
self.client = client self.client = client
self.ssl = ssl
self.ssl_server_hostname = ssl_server_hostname
for name in Protocol.client_delegated: for name in Protocol.client_delegated:
setattr(self, name, getattr(client, name)) setattr(self, name, getattr(client, name))
self.cache = cache self.cache = cache
...@@ -344,6 +350,8 @@ class Client: ...@@ -344,6 +350,8 @@ class Client:
self.protocols = [ self.protocols = [
Protocol(self.loop, addr, self, Protocol(self.loop, addr, self,
self.storage_key, self.read_only, self.connect_poll, self.storage_key, self.read_only, self.connect_poll,
ssl=self.ssl,
ssl_server_hostname=self.ssl_server_hostname,
) )
for addr in self.addrs for addr in self.addrs
] ]
...@@ -584,14 +592,16 @@ class Client: ...@@ -584,14 +592,16 @@ class Client:
class ClientRunner: class ClientRunner:
def set_options(self, addrs, wrapper, cache, storage_key, read_only, def set_options(self, addrs, wrapper, cache, storage_key, read_only,
timeout=30, disconnect_poll=1): timeout=30, disconnect_poll=1,
**kwargs):
self.__args = (addrs, wrapper, cache, storage_key, read_only, self.__args = (addrs, wrapper, cache, storage_key, read_only,
disconnect_poll) disconnect_poll)
self.__kwargs = kwargs
self.timeout = timeout self.timeout = timeout
def setup_delegation(self, loop): def setup_delegation(self, loop):
self.loop = loop self.loop = loop
self.client = Client(loop, *self.__args) self.client = Client(loop, *self.__args, **self.__kwargs)
self.call_threadsafe = self.client.call_threadsafe self.call_threadsafe = self.client.call_threadsafe
self.call_async_threadsafe = self.client.call_async_threadsafe self.call_async_threadsafe = self.client.call_async_threadsafe
...@@ -685,9 +695,10 @@ class ClientThread(ClientRunner): ...@@ -685,9 +695,10 @@ class ClientThread(ClientRunner):
def __init__(self, addrs, client, cache, def __init__(self, addrs, client, cache,
storage_key='1', read_only=False, timeout=30, storage_key='1', read_only=False, timeout=30,
disconnect_poll=1): disconnect_poll=1, ssl=None, ssl_server_hostname=None):
self.set_options(addrs, client, cache, storage_key, read_only, self.set_options(addrs, client, cache, storage_key, read_only,
timeout, disconnect_poll) timeout, disconnect_poll,
ssl=ssl, ssl_server_hostname=ssl_server_hostname)
self.thread = threading.Thread( self.thread = threading.Thread(
target=self.run, target=self.run,
name="%s zeo client networking thread" % client.__name__, name="%s zeo client networking thread" % client.__name__,
......
...@@ -11,8 +11,43 @@ ...@@ -11,8 +11,43 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Multi-threaded server connectin acceptor
Each connection is run in it's own thread. Testing serveral years ago
suggsted that this was a win, but ZODB shootout and another
lower-level tests suggest otherwise. It's really unclear, which is
why we're keeping this around for now.
Asyncio doesn't let you accept connections in one thread and handle
them in another. To get around this, we have a listener implemented
using asyncore, but when we get a connection, we hand the socket to
asyncio. This worked well until we added SSL support. (Even then, it
worked on Mac OS X for some reason.)
SSL + non-blocking sockets requires special care, which asyncio
provides. Unfortunately, create_connection, assumes it's creating a
client connection. It would be easy to fix this,
http://bugs.python.org/issue27392, but it's hard to justify the fix to
get it accepted, so we won't bother for now. This currently uses a
horrible monley patch to work with SSL.
To use this module, replace::
from .asyncio.server import Acceptor
with:
from .asyncio.mtacceptor import Acceptor
in ZEO.StorageServer.
"""
import asyncio
import asyncore import asyncore
import socket import socket
import threading
from .server import ServerProtocol
# _has_dualstack: True if the dual-stack sockets are supported # _has_dualstack: True if the dual-stack sockets are supported
try: try:
...@@ -36,13 +71,18 @@ import logging ...@@ -36,13 +71,18 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Acceptor(asyncore.dispatcher): class Acceptor(asyncore.dispatcher):
"""A server that accepts incoming RPC connections""" """A server that accepts incoming RPC connections
def __init__(self, addr, factory): And creates a separate thread for each.
self.socket_map = {} """
asyncore.dispatcher.__init__(self, map=self.socket_map)
def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr self.addr = addr
self.factory = factory self.__socket_map = {}
asyncore.dispatcher.__init__(self, map=self.__socket_map)
self.ssl_context = ssl
self._open_socket() self._open_socket()
def _open_socket(self): def _open_socket(self):
...@@ -120,17 +160,50 @@ class Acceptor(asyncore.dispatcher): ...@@ -120,17 +160,50 @@ class Acceptor(asyncore.dispatcher):
addr = addr[:2] addr = addr[:2]
try: try:
c = self.factory(sock, addr) logger.debug("new connection %s" % (addr,))
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
zs = self.storage_server.create_client_handler()
protocol = ServerProtocol(loop, self.addr, zs)
protocol.stop = loop.stop
if self.ssl_context is None:
cr = loop.create_connection((lambda : protocol), sock=sock)
else:
#######################################################
# XXX See http://bugs.python.org/issue27392 :(
_make_ssl_transport = loop._make_ssl_transport
def make_ssl_transport(*a, **kw):
kw['server_side'] = True
return _make_ssl_transport(*a, **kw)
loop._make_ssl_transport = make_ssl_transport
#
#######################################################
cr = loop.create_connection(
(lambda : protocol), sock=sock,
ssl=self.ssl_context,
server_hostname='fu' # http://bugs.python.org/issue27391
)
asyncio.async(cr, loop=loop)
loop.run_forever()
loop.close()
thread = threading.Thread(target=run, name='zeo_client_hander')
thread.setDaemon(True)
thread.start()
except Exception: except Exception:
if sock.fileno() in asyncore.socket_map: if sock.fileno() in self.__socket_map:
del asyncore.socket_map[sock.fileno()] del self.__socket_map[sock.fileno()]
logger.exception("Error in handle_accept") logger.exception("Error in handle_accept")
else: else:
logger.info("connect from %s: %s", repr(addr), c) logger.info("connect from %s", repr(addr))
def loop(self, timeout=30.0): def loop(self, timeout=30.0):
try: try:
asyncore.loop(map=self.socket_map, timeout=timeout) asyncore.loop(map=self.__socket_map, timeout=timeout)
except Exception: except Exception:
if not self.__closed: if not self.__closed:
raise # Unexpected exc raise # Unexpected exc
...@@ -142,4 +215,4 @@ class Acceptor(asyncore.dispatcher): ...@@ -142,4 +215,4 @@ class Acceptor(asyncore.dispatcher):
if not self.__closed: if not self.__closed:
self.__closed = True self.__closed = True
asyncore.dispatcher.close(self) asyncore.dispatcher.close(self)
logger.debug("Closed accepter, %s", len(self.socket_map)) logger.debug("Closed accepter, %s", len(self.__socket_map))
...@@ -51,11 +51,12 @@ class ServerProtocol(base.Protocol): ...@@ -51,11 +51,12 @@ class ServerProtocol(base.Protocol):
if exc: if exc:
logger.error("Disconnected %s:%s", exc.__class__.__name__, exc) logger.error("Disconnected %s:%s", exc.__class__.__name__, exc)
self.zeo_storage.notify_disconnected() self.zeo_storage.notify_disconnected()
self.stop()
self.loop.stop() def stop(self):
pass # Might be replaced when running a thread per client
def finish_connect(self, protocol_version): def finish_connect(self, protocol_version):
if protocol_version == b'ruok': if protocol_version == b'ruok':
self._write(json.dumps(self.zeo_storage.ruok()).encode("ascii")) self._write(json.dumps(self.zeo_storage.ruok()).encode("ascii"))
self.close() self.close()
...@@ -199,3 +200,73 @@ class MTDelay(Delay): ...@@ -199,3 +200,73 @@ class MTDelay(Delay):
def error(self, exc_info): def error(self, exc_info):
self.ready.wait() self.ready.wait()
self.protocol.call_soon_threadsafe(Delay.error, self, exc_info) self.protocol.call_soon_threadsafe(Delay.error, self, exc_info)
class Acceptor:
def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr
self.ssl_context = ssl
self.event_loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1],
reuse_address=True, ssl=ssl)
else:
cr = loop.create_unix_server(self.factory, addr, ssl=ssl)
f = asyncio.async(cr)
@f.add_done_callback
def listenting(f):
server = f.result()
self.server = server
if isinstance(addr, tuple) and addr[1] == 0:
addrs = [s.getsockname() for s in server.sockets]
addrs = [a for a in addrs if len(a) == len(addr)]
if addrs:
self.addr = addrs[0]
else:
self.addr = server.sockets[0].getsockname()[:len(addr)]
logger.info("listening on %s", str(addr))
loop.run_until_complete(f)
def factory(self):
try:
logger.debug("Accepted connection")
zs = self.storage_server.create_client_handler()
protocol = ServerProtocol(self.event_loop, self.addr, zs)
except Exception:
logger.exception("Failure in protocol factory")
return protocol
def loop(self, timeout=None):
self.event_loop.run_forever()
self.event_loop.close()
closed = False
def close(self):
if not self.closed:
self.closed = True
self.event_loop.call_soon_threadsafe(self._close)
def _close(self):
loop = self.event_loop
self.server.close()
f = asyncio.async(self.server.wait_closed(), loop=loop)
@f.add_done_callback
def server_closed(f):
# stop the loop when the server closes:
loop.call_soon(loop.stop)
def timeout():
logger.warning("Timed out closing asyncio.Server")
loop.call_soon(loop.stop)
# But if the server doesn't close in a second, stop the loop anyway.
loop.call_later(1, timeout)
...@@ -31,7 +31,8 @@ class Loop: ...@@ -31,7 +31,8 @@ class Loop:
future.set_exception(ConnectionRefusedError()) future.set_exception(ConnectionRefusedError())
def create_connection( def create_connection(
self, protocol_factory, host=None, port=None, sock=None self, protocol_factory, host=None, port=None, sock=None,
ssl=None, server_hostname=None
): ):
future = asyncio.Future(loop=self) future = asyncio.Future(loop=self)
if sock is None: if sock is None:
...@@ -85,7 +86,7 @@ class Transport: ...@@ -85,7 +86,7 @@ class Transport:
capacity = 1 << 64 capacity = 1 << 64
paused = False paused = False
extra = dict(peername='1.2.3.4') extra = dict(peername='1.2.3.4', sockname=('127.0.0.1', 4200))
def __init__(self, protocol): def __init__(self, protocol):
self.data = [] self.data = []
......
<component> <component>
<sectiontype name="zeo"> <import package="ZODB"/>
<description> <sectiontype name="ssl" datatype="ZEO.zconfig.client_ssl">
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address" <key name="certificate" datatype="existing-dirpath" required="yes">
required="yes">
<description> <description>
The address at which the server should listen. This can be in The full path to an SSL certificate file.
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description> </description>
</key> </key>
<key name="read-only" datatype="boolean" <key name="key" datatype="existing-dirpath" required="no">
required="no"
default="false">
<description> <description>
Flag indicating whether the server should operate in read-only The full path to an SSL key file for the client certificate.
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description> </description>
</key> </key>
<key name="invalidation-queue-size" datatype="integer" <key name="password-function" required="no">
required="no"
default="100">
<description> <description>
The storage server keeps a queue of the objects modified by the Dotted name of importable function for retrieving a password
last N transactions, where N == invalidation_queue_size. This for the client certificate key.
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description> </description>
</key> </key>
<key name="invalidation-age" datatype="float" required="no"> <key name="authenticate" datatype="existing-dirpath" required="no">
<description> <description>
The maximum age of a client for which quick-verification Path to a file or directory containing server certificates to be
invalidations will be provided by iterating over the served authenticated.
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description> </description>
</key> </key>
<key name="transaction-timeout" datatype="integer" <key name="check-hostname" datatype="boolean" required="no" default="true">
required="no">
<description> <description>
The maximum amount of time to wait for a transaction to commit Verify the host name in the server certificate is as expected.
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description> </description>
</key> </key>
<key name="pid-filename" datatype="existing-dirpath" <key name="server-hostname" required="no">
required="no">
<description> <description>
The full path to the file in which to write the ZEO server's Process ID Host name to use for SSL host name checks.
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
If ``check-hostname`` is true then use this as the
value to check. If an address is a host/port pair, then this
defaults to the host in the address.
</description> </description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key> </key>
<!-- DM 2006-06-12: added option --> </sectiontype>
<key name="drop-cache-rather-verify" datatype="boolean"
required="no" default="false"> <sectiontype name="clientstorage" datatype="ZEO.zconfig.ClientStorageConfig"
<description> implements="ZODB.storage">
indicates that the cache should be dropped rather than
verified when the verification optimization is not <section type="ssl" name="*" attribute="ssl" />
available (e.g. when the ZEO server restarted).
</description> <multikey name="server" datatype="socket-connection-address" required="yes"
/>
<key name="cache-size" datatype="byte-size" default="20MB">
<description>
The cache size in bytes, KB or MB. This defaults to a 20MB.
Optional ``KB`` or ``MB`` suffixes can (and usually are) used to
specify units other than bytes.
</description>
</key>
<key name="cache-path" required="no">
<description>
The file path of a persistent cache file
</description>
</key>
<key name="blob-dir" required="no">
<description>
Path name to the blob cache directory.
</description>
</key>
<key name="shared-blob-dir" required="no" default="no"
datatype="boolean">
<description>
Tells whether the cache is a shared writable directory
and that the ZEO protocol should not transfer the file
but only the filename when committing.
</description>
</key>
<key name="blob-cache-size" required="no" datatype="byte-size">
<description>
Maximum size of the ZEO blob cache, in bytes. If not set, then
the cache size isn't checked and the blob directory will
grow without bound.
This option is ignored if shared_blob_dir is true.
</description>
</key>
<key name="blob-cache-size-check" required="no" datatype="integer">
<description>
ZEO check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 10% of the blob cache
size. This option is ignored if shared_blob_dir is true.
</description>
</key>
<key name="read-only" datatype="boolean" default="off">
<description>
A flag indicating whether this should be a read-only storage,
defaulting to false (i.e. writing is allowed by default).
</description>
</key>
<key name="read-only-fallback" datatype="boolean" default="off">
<description>
A flag indicating whether a read-only remote storage should be
acceptable as a fallback when no writable storages are
available. Defaults to false. At most one of read_only and
read_only_fallback should be true.
</description>
</key>
<key name="wait-timeout" datatype="integer" default="30">
<description>
How long to wait for an initial connection, defaulting to 30
seconds. If an initial connection can't be made within this time
limit, then creation of the client storage will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
After the initial connection, if the client is disconnected:
- In-flight server requests will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
- New requests will block for up to ``wait_timeout`` waiting for a
connection to be established before failing with a
``ZEO.Exceptions.ClientDisconnected`` exception.
</description>
</key>
<key name="client-label" required="no">
<description>
A label for the client in server logs
</description>
</key>
<!-- The following are undocumented, but not gone. :) -->
<key name="storage" default="1">
<description>
The name of the storage that the client wants to use. If the
ZEO server serves more than one storage, the client selects
the storage it wants to use by name. The default name is '1',
which is also the default name for the ZEO server.
</description>
</key>
<key name="name" default="">
<description>
The storage name. If unspecified, the address of the server
will be used as the name.
</description>
</key> </key>
</sectiontype> </sectiontype>
......
...@@ -105,6 +105,7 @@ class ZEOOptionsMixin: ...@@ -105,6 +105,7 @@ class ZEOOptionsMixin:
"t:", "timeout=", float) "t:", "timeout=", float)
self.add('pid_file', 'zeo.pid_filename', self.add('pid_file', 'zeo.pid_filename',
None, 'pid-file=') None, 'pid-file=')
self.add("ssl", "zeo.ssl")
class ZEOOptions(ZDOptions, ZEOOptionsMixin): class ZEOOptions(ZDOptions, ZEOOptionsMixin):
...@@ -341,6 +342,7 @@ def create_server(storages, options): ...@@ -341,6 +342,7 @@ def create_server(storages, options):
invalidation_queue_size = options.invalidation_queue_size, invalidation_queue_size = options.invalidation_queue_size,
invalidation_age = options.invalidation_age, invalidation_age = options.invalidation_age,
transaction_timeout = options.transaction_timeout, transaction_timeout = options.transaction_timeout,
ssl = options.ssl,
) )
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<import package="ZODB"/> <import package="ZODB"/>
<!-- Use the ZEO server information structure. --> <!-- Use the ZEO server information structure. -->
<import package="ZEO"/> <import package="ZEO" file="server.xml" />
<import package="ZConfig.components.logger"/> <import package="ZConfig.components.logger"/>
......
<component>
<sectiontype name="ssl" datatype="ZEO.zconfig.server_ssl">
<key name="certificate" datatype="existing-dirpath" required="yes">
<description>
The full path to an SSL certificate file.
</description>
</key>
<key name="key" datatype="existing-dirpath" required="no">
<description>
The full path to an SSL key file for the server certificate.
</description>
</key>
<key name="password-function" required="no">
<description>
Dotted name of importable function for retrieving a password
for the client certificate key.
</description>
</key>
<key name="authenticate" required="yes">
<description>
Path to a file or directory containing client certificates to
be authenticated. This can also be - or SIGNED to require
signed client certificates.
</description>
</key>
</sectiontype>
<sectiontype name="zeo">
<section type="ssl" name="*" attribute="ssl" />
<description>
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address"
required="yes">
<description>
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="read-only" datatype="boolean"
required="no"
default="false">
<description>
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description>
</key>
<key name="invalidation-queue-size" datatype="integer"
required="no"
default="100">
<description>
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description>
</key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="transaction-timeout" datatype="integer"
required="no">
<description>
The maximum amount of time to wait for a transaction to commit
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description>
</key>
<key name="pid-filename" datatype="existing-dirpath"
required="no">
<description>
The full path to the file in which to write the ZEO server's Process ID
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
</description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>
</sectiontype>
</component>
...@@ -127,7 +127,8 @@ class CommitLockTests: ...@@ -127,7 +127,8 @@ class CommitLockTests:
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address. # address.
addr = self._storage._addr addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1) new = ZEO.ClientStorage.ClientStorage(
addr, wait=1, **self._client_options())
new.registerDB(DummyDB()) new.registerDB(DummyDB())
return new return new
......
...@@ -16,7 +16,6 @@ import contextlib ...@@ -16,7 +16,6 @@ import contextlib
import os import os
import time import time
import socket import socket
import asyncore
import threading import threading
import logging import logging
...@@ -36,6 +35,8 @@ import ZODB.tests.util ...@@ -36,6 +35,8 @@ import ZODB.tests.util
import transaction import transaction
from transaction import Transaction from transaction import Transaction
from . import testssl
logger = logging.getLogger('ZEO.tests.ConnectionTests') logger = logging.getLogger('ZEO.tests.ConnectionTests')
ZERO = '\0'*8 ZERO = '\0'*8
...@@ -66,7 +67,6 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -66,7 +67,6 @@ class CommonSetupTearDown(StorageTestBase):
keep = 0 keep = 0
invq = None invq = None
timeout = None timeout = None
monitor = 0
db_class = DummyDB db_class = DummyDB
def setUp(self, before=None): def setUp(self, before=None):
...@@ -79,10 +79,9 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -79,10 +79,9 @@ class CommonSetupTearDown(StorageTestBase):
self.__super_setUp() self.__super_setUp()
logging.info("setUp() %s", self.id()) logging.info("setUp() %s", self.id())
self.file = 'storage_conf' self.file = 'storage_conf'
self.addr = []
self._servers = [] self._servers = []
self.caches = [] self.caches = []
self._newAddr() self.addr = [('localhost', 0)]
self.startServer() self.startServer()
def tearDown(self): def tearDown(self):
...@@ -147,18 +146,17 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -147,18 +146,17 @@ class CommonSetupTearDown(StorageTestBase):
min_disconnect_poll=0.1, min_disconnect_poll=0.1,
read_only=read_only, read_only=read_only,
read_only_fallback=read_only_fallback, read_only_fallback=read_only_fallback,
username=username, **self._client_options())
password=password,
realm=realm)
storage.registerDB(DummyDB()) storage.registerDB(DummyDB())
return storage return storage
def _client_options(self):
return {}
def getServerConfig(self, addr, ro_svr): def getServerConfig(self, addr, ro_svr):
zconf = forker.ZEOConfig(addr) zconf = forker.ZEOConfig(addr)
if ro_svr: if ro_svr:
zconf.read_only = 1 zconf.read_only = 1
if self.monitor:
zconf.monitor_address = ("", 42000)
if self.invq: if self.invq:
zconf.invalidation_queue_size = self.invq zconf.invalidation_queue_size = self.invq
if self.timeout: if self.timeout:
...@@ -179,6 +177,8 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -179,6 +177,8 @@ class CommonSetupTearDown(StorageTestBase):
zeoport, stop = forker.start_zeo_server( zeoport, stop = forker.start_zeo_server(
sconf, zconf, addr[1], keep, **kw) sconf, zconf, addr[1], keep, **kw)
self._servers.append(stop) self._servers.append(stop)
if addr[1] == 0:
self.addr[index] = zeoport
def shutdownServer(self, index=0): def shutdownServer(self, index=0):
logging.info("shutdownServer(index=%d) @ %s" % logging.info("shutdownServer(index=%d) @ %s" %
...@@ -191,31 +191,16 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -191,31 +191,16 @@ class CommonSetupTearDown(StorageTestBase):
def pollUp(self, timeout=30.0, storage=None): def pollUp(self, timeout=30.0, storage=None):
if storage is None: if storage is None:
storage = self._storage storage = self._storage
# Poll until we're connected. storage.server_status()
now = time.time()
giveup = now + timeout
while not storage.is_connected():
asyncore.poll(0.1)
now = time.time()
if now > giveup:
self.fail("timed out waiting for storage to connect")
# When the socket map is empty, poll() returns immediately,
# and this is a pure busy-loop then. At least on some Linux
# flavors, that can starve the thread trying to connect,
# leading to grossly increased runtime (typical) or bogus
# "timed out" failures. A little sleep here cures both.
time.sleep(0.1)
def pollDown(self, timeout=30.0): def pollDown(self, timeout=30.0):
# Poll until we're disconnected. # Poll until we're disconnected.
now = time.time() now = time.time()
giveup = now + timeout giveup = now + timeout
while self._storage.is_connected(): while self._storage.is_connected():
asyncore.poll(0.1)
now = time.time() now = time.time()
if now > giveup: if now > giveup:
self.fail("timed out waiting for storage to disconnect") self.fail("timed out waiting for storage to disconnect")
# See pollUp() for why we sleep a little here.
time.sleep(0.1) time.sleep(0.1)
...@@ -564,6 +549,18 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -564,6 +549,18 @@ class ConnectionTests(CommonSetupTearDown):
self.assertRaises(ClientDisconnected, self.assertRaises(ClientDisconnected,
self._storage.load, b'\0'*8, '') self._storage.load, b'\0'*8, '')
class SSLConnectionTests(ConnectionTests):
def getServerConfig(self, addr, ro_svr):
return testssl.server_config.replace(
'127.0.0.1:0',
'{}: {}\nread-only {}'.format(
addr[0], addr[1], 'true' if ro_svr else 'false'))
def _client_options(self):
return {'ssl': testssl.client_ssl()}
class InvqTests(CommonSetupTearDown): class InvqTests(CommonSetupTearDown):
invq = 3 invq = 3
...@@ -966,7 +963,7 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -966,7 +963,7 @@ class TimeoutTests(CommonSetupTearDown):
self.assertRaises(ClientDisconnected, storage.tpc_finish, txn) self.assertRaises(ClientDisconnected, storage.tpc_finish, txn)
# Make sure it's logged as CRITICAL # Make sure it's logged as CRITICAL
for line in open("server-%s.log" % self.addr[0][1]): for line in open("server-0.log"):
if (('Transaction timeout after' in line) and if (('Transaction timeout after' in line) and
('CRITICAL ZEO.StorageServer' in line) ('CRITICAL ZEO.StorageServer' in line)
): ):
......
-----BEGIN CERTIFICATE-----
MIID/TCCAuWgAwIBAgIJAJWOSC4oLyp9MA0GCSqGSIb3DQEBBQUAMFwxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9k
Yi5vcmcxHjAcBgkqhkiG9w0BCQEWD2NsaWVudEB6b2RiLm9yZzAeFw0xNjA2MjMx
NTA3MTNaFw0xNzA2MjMxNTA3MTNaMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJW
QTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0B
CQEWD2NsaWVudEB6b2RiLm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBANqtLqtrzwDv0nakKiTX5ZtSHOaTmfwHzHsZJkcNf7kJUgGtE0oe3cIj4iAC
CCpPfUf9OfIA1NpmDsvPMqw80ho1o9g8QZWfW6QOTj2kbnanrRqMDmRvrWlzMQIe
+7EabYxbTjyduk76N2Sa4Tf/my6Yton3zyKtdjSJzoQ/SAKT+Nvjt5s47I4THEFY
gN7Njbg1FyihKwuwR64EUsyBanutKvXVT7gnB1V2cQhn+LW+NwgzsxKnptvyvBGr
Ago+uoxrHlIQu59xznS2vA3Ck2K3hIOnXpXYYGeRYKzplZLZnCfZ2uQheiO3ioEO
UCXICbPMxA7IEpTC75j1n9a3HKcCAwEAAaOBwTCBvjAdBgNVHQ4EFgQUCg1EhkHz
qYFOgbI/D6zR1tcwHBcwgY4GA1UdIwSBhjCBg4AUCg1EhkHzqYFOgbI/D6zR1tcw
HBehYKReMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9E
QjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0BCQEWD2NsaWVudEB6b2Ri
Lm9yZ4IJAJWOSC4oLyp9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
AJk3TviNGmjzCuDzLajeLEB9Iy285Fl3D/3TvThSDECS3cUq+i8fQm0cjOYnE/rK
6Lpg6soeQFetoWQsNT2uvxyv3iZEOtBwNhQKaKTkzIRTmMx/aWM0zG0c3psepC6c
1Fgp0HAts2JKC4Ni7zHFBDb8YZi87IUHYNKuJKcUWKiEgeHu2zCI1Q43FMSKoaG8
XwYi1Mxw6TQQtjZrMnNPSeO7zySBuCw10bCZSMC5xvsqckfREifRT//4A0/COWYK
/p6TZMTaMjrK8fOaPpap314QnLf80P6oLEZ7wkghaNuyq8IzgATuxYVy21132MNB
qZIUS+iblQAZDSHnQJoehQQ=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA2q0uq2vPAO/SdqQqJNflm1Ic5pOZ/AfMexkmRw1/uQlSAa0T
Sh7dwiPiIAIIKk99R/058gDU2mYOy88yrDzSGjWj2DxBlZ9bpA5OPaRudqetGowO
ZG+taXMxAh77sRptjFtOPJ26Tvo3ZJrhN/+bLpi2iffPIq12NInOhD9IApP42+O3
mzjsjhMcQViA3s2NuDUXKKErC7BHrgRSzIFqe60q9dVPuCcHVXZxCGf4tb43CDOz
Eqem2/K8EasCCj66jGseUhC7n3HOdLa8DcKTYreEg6deldhgZ5FgrOmVktmcJ9na
5CF6I7eKgQ5QJcgJs8zEDsgSlMLvmPWf1rccpwIDAQABAoIBAQDSju7hIG2x+Tou
AuSRlVEAvZAWdQlQJDJAVXcF83mIMfFEq+Jm/FGLHgIdz9cM5n07VBj3bNWHdb3J
gTjJn8audffNvjdoWoli7mNn92xl1A5aAYHaM65GWyRVZn/zh/7zpvcuZrF+Wm/7
7yXtRbGmrGUXdAV+3odzDz5LGKO91fTuM2nW0j+p7+q2Bzko7+rl9AVxaveco4pt
TtqXX2eOC3wTcNotBfJJD89/+/szg62K4CYCUAaetKMPcVrgQ4v0YHakOl8lJJxW
q7XiqhPyjeZp6h8e9dtVCkeZHa3xacuoftF2w3FslVEX/LOooAKf07PU1xxJezZN
trP11pgBAoGBAPoysqD9OW3C0WyIXm/Lx1Pncr/p/kKYlXNDWQRmizHmfLC4xrBI
n75mYSp7BJ7Gh2W7lW9lnyJ0uaqXdqOOswO/baTeSEvvhL+lLs4F1cQWemjTD4qy
KqCxCCbTf8gZPssiJXsXGmf6yWAdpjfYUxarxB0Ks6wHLBpQHctvVebJAoGBAN+/
W0+yAXssr7AWgKAyiHtqE/MiI8e0bDJWDjHxaPvnSVyDwVOARj/JKvw7jnhg8FKJ
1iWtl8ym2ktcAQygSl3HW4wggD59BbeXrUQr4nZi8wLMTpLxnSfITFrwXpwVShT4
8KJPR46W/Plkphm1fZn6Lr0uGJ12pV+iPAq/F+/vAoGALmRoKuHJXEjbfDxtBl3K
wAwSgvNoagDQ9WZvgxlghggu5rXcYaOVu0BQlAfre2VkhcCanOVC9KigJLmhDgLP
vsooEoIE9c+b1c1TOHBsiseAOx+nqhgPP2yUDl75Oqkzs4bJXGGUS+N8o43b3E8I
WRPQcXIijqtlyhtA6w/h5cECgYEA2M6DnGXQKZrTYr1rRc+xkGTpj9607P5XGS9p
8dsK74zd+VdyLYdOiuBTVrYfB2ZneJM3fqsHPLcxL3SnT6TCarySaOXVXremoo/G
xRgBCNY4w61VNe4JalMcKcJg6r12W3wdMCnCHNkRqFdu29qRKnLSd14DXBFrjY+W
vpMMjuECgYEAtNOm9WTCAlVbGTMplXcUJBWNwdOz+sHMi+PhP/fTNm0XH3fepFeQ
th7b/RYocTwAHGu8mArX2DgXAfDQAYMPFN8vmbh0XQsQV+iuja9MO2TagTyVSe3x
DBOTFCLg8oWtzwySZ6BKR/KsGI2ryRxyE1VojV0zNXRyZqDCm+G4Vs0=
-----END RSA PRIVATE KEY-----
...@@ -14,67 +14,46 @@ ZODB.notify. ...@@ -14,67 +14,46 @@ ZODB.notify.
Now, let's start a server and verify that we get a serving event: Now, let's start a server and verify that we get a serving event:
>>> import ZEO.StorageServer, ZODB.MappingStorage >>> import ZEO
>>> server = ZEO.StorageServer.StorageServer( >>> addr, stop = ZEO.server()
... ('127.0.0.1', 0), {'1': ZODB.MappingStorage.MappingStorage()})
>>> isinstance(last_event, ZEO.StorageServer.Serving) >>> isinstance(last_event, ZEO.StorageServer.Serving)
True True
>>> last_event.server is server
True
>>> last_event.address[0], last_event.address[1] > 0
('127.0.0.1', True)
If the host part pf the address passed to the constructor is not an >>> last_event.address == addr
empty string. then the server addr attribute is the same as the
address attribute of the event:
>>> server.addr == last_event.address
True True
Let's run the server in a thread, to make sure we can connect. >>> server = last_event.server
>>> server.addr == addr
True
>>> server.start_thread() Let's make sure we can connect.
>>> client = ZEO.client(last_event.address) >>> client = ZEO.client(last_event.address).close()
>>> client.is_connected()
True
If we close the server, we'll get a closed event: If we close the server, we'll get a closed event:
>>> server.close() >>> stop()
>>> isinstance(last_event, ZEO.StorageServer.Closed) >>> isinstance(last_event, ZEO.StorageServer.Closed)
True True
>>> last_event.server is server >>> last_event.server is server
True True
>>> wait_until(lambda : not client.is_connected(test=True))
>>> client.close()
If we pass an empty string as the host part of the server address, we If we pass an empty string as the host part of the server address, we
can't really assign a single address, so the server addr attribute is can't really assign a single address, so the server addr attribute is
left alone: left alone:
>>> server = ZEO.StorageServer.StorageServer( >>> addr, stop = ZEO.server(port=('', 0))
... ('', 0), {'1': ZODB.MappingStorage.MappingStorage()})
>>> isinstance(last_event, ZEO.StorageServer.Serving) >>> isinstance(last_event, ZEO.StorageServer.Serving)
True True
>>> last_event.server is server
True
>>> last_event.address[1] > 0 >>> last_event.address[1] > 0
True True
If the host part pf the address passed to the constructor is not an >>> last_event.server.addr
empty string. then the server addr attribute is the same as the
address attribute of the event:
>>> server.addr
('', 0) ('', 0)
>>> server.close() >>> stop()
The runzeo module provides some process support, including getting the The runzeo module provides some process support, including getting the
server configuration via a ZConfig configuration file. To spell a server configuration via a ZConfig configuration file. To spell a
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
############################################################################## ##############################################################################
"""Library for forking storage server and connecting client storage""" """Library for forking storage server and connecting client storage"""
from __future__ import print_function from __future__ import print_function
import gc
import os import os
import random import random
import sys import sys
...@@ -25,7 +26,7 @@ import tempfile ...@@ -25,7 +26,7 @@ import tempfile
import six import six
import ZODB.tests.util import ZODB.tests.util
import zope.testing.setupstack import zope.testing.setupstack
from ZEO._compat import BytesIO from ZEO._compat import StringIO
logger = logging.getLogger('ZEO.tests.forker') logger = logging.getLogger('ZEO.tests.forker')
...@@ -40,9 +41,6 @@ class ZEOConfig: ...@@ -40,9 +41,6 @@ class ZEOConfig:
addr = '%s:%s' % addr addr = '%s:%s' % addr
self.address = addr self.address = addr
self.read_only = None self.read_only = None
self.invalidation_queue_size = None
self.invalidation_age = None
self.transaction_timeout = None
self.loglevel = 'INFO' self.loglevel = 'INFO'
def dump(self, f): def dump(self, f):
...@@ -50,13 +48,16 @@ class ZEOConfig: ...@@ -50,13 +48,16 @@ class ZEOConfig:
print("address " + self.address, file=f) print("address " + self.address, file=f)
if self.read_only is not None: if self.read_only is not None:
print("read-only", self.read_only and "true" or "false", file=f) print("read-only", self.read_only and "true" or "false", file=f)
if self.invalidation_queue_size is not None:
print("invalidation-queue-size", for name in (
self.invalidation_queue_size, file=f) 'invalidation_queue_size', 'invalidation_age',
if self.invalidation_age is not None: 'transaction_timeout', 'pid_filename',
print("invalidation-age", self.invalidation_age, file=f) 'ssl_certificate', 'ssl_key',
if self.transaction_timeout is not None: ):
print("transaction-timeout", self.transaction_timeout, file=f) v = getattr(self, name, None)
if v:
print(name.replace('_', '-'), v, file=f)
print("</zeo>", file=f) print("</zeo>", file=f)
print(""" print("""
...@@ -69,9 +70,9 @@ class ZEOConfig: ...@@ -69,9 +70,9 @@ class ZEOConfig:
""" % (self.loglevel, self.logpath), file=f) """ % (self.loglevel, self.logpath), file=f)
def __str__(self): def __str__(self):
f = BytesIO() f = StringIO()
self.dump(f) self.dump(f)
return f.getvalue().decode() return f.getvalue()
def encode_format(fmt): def encode_format(fmt):
...@@ -83,7 +84,7 @@ def encode_format(fmt): ...@@ -83,7 +84,7 @@ def encode_format(fmt):
return fmt return fmt
def runner(config, qin, qout, timeout=None, def runner(config, qin, qout, timeout=None,
join_timeout=9, debug=False, name=None, debug=False, name=None,
keep=False, protocol=None): keep=False, protocol=None):
if debug: if debug:
...@@ -124,11 +125,7 @@ def runner(config, qin, qout, timeout=None, ...@@ -124,11 +125,7 @@ def runner(config, qin, qout, timeout=None,
except Empty: except Empty:
pass pass
server.server.close() server.server.close()
thread.join(join_timeout) thread.join(3)
if thread.is_alive():
logger.warning("server thread didn't stop")
else:
logger.debug('server thread stopped')
if not keep: if not keep:
# Try to cleanup storage files # Try to cleanup storage files
...@@ -138,10 +135,11 @@ def runner(config, qin, qout, timeout=None, ...@@ -138,10 +135,11 @@ def runner(config, qin, qout, timeout=None,
except AttributeError: except AttributeError:
pass pass
qout.put('stopped') qout.put(thread.is_alive())
qin.get(timeout=11) # ack
if hasattr(qout, 'close'): if hasattr(qout, 'close'):
qout.close() qout.close()
qout.join_thread() qout.cancel_join_thread()
except Exception: except Exception:
logger.exception("In server thread") logger.exception("In server thread")
...@@ -153,12 +151,25 @@ def runner(config, qin, qout, timeout=None, ...@@ -153,12 +151,25 @@ def runner(config, qin, qout, timeout=None,
def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None): def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop') qin.put('stop')
if hasattr(qin, 'close'): dirty = qout.get(timeout=stop_timeout)
qin.close() qin.put('ack')
qin.join_thread() if dirty:
qout.get(timeout=stop_timeout) print("WARNING SERVER DIDN'T STOP CLEANLY", file=sys.stderr)
# The runner thread didn't stop. If it was a process,
# give it some time to exit
if hasattr(thread, 'pid') and thread.pid:
os.waitpid(thread.pid)
else:
# Gaaaa, force gc in hopes of maybe getting the unclosed
# sockets to get GCed
gc.collect()
thread.join(stop_timeout) thread.join(stop_timeout)
os.remove(config) os.remove(config)
if hasattr(qin, 'close'):
qin.close()
qin.cancel_join_thread()
def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs', protocol=None, blob_dir=None, path='Data.fs', protocol=None, blob_dir=None,
...@@ -181,25 +192,24 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -181,25 +192,24 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
storage_conf = '<blobstorage>\nblob-dir %s\n%s\n</blobstorage>' % ( storage_conf = '<blobstorage>\nblob-dir %s\n%s\n</blobstorage>' % (
blob_dir, storage_conf) blob_dir, storage_conf)
if port is None: if zeo_conf is None or isinstance(zeo_conf, dict):
raise AssertionError("The port wasn't specified") if port is None:
raise AssertionError("The port wasn't specified")
if isinstance(port, int): if isinstance(port, int):
addr = 'localhost', port addr = 'localhost', port
else: else:
addr = port addr = port
adminaddr = port+'-test'
if zeo_conf is None or isinstance(zeo_conf, dict):
z = ZEOConfig(addr) z = ZEOConfig(addr)
if zeo_conf: if zeo_conf:
z.__dict__.update(zeo_conf) z.__dict__.update(zeo_conf)
zeo_conf = z zeo_conf = str(z)
# Store the config info in a temp file. # Store the config info in a temp file.
tmpfile = tempfile.mktemp(".conf", dir=os.getcwd()) tmpfile = tempfile.mktemp(".conf", dir=os.getcwd())
fp = open(tmpfile, 'w') fp = open(tmpfile, 'w')
zeo_conf.dump(fp) fp.write(str(zeo_conf) + '\n\n')
fp.write(storage_conf) fp.write(storage_conf)
fp.close() fp.close()
...@@ -222,7 +232,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -222,7 +232,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
thread.start() thread.start()
addr = qout.get(timeout=start_timeout) addr = qout.get(timeout=start_timeout)
def stop(stop_timeout=9): def stop(stop_timeout=99):
stop_runner(thread, tmpfile, qin, qout, stop_timeout) stop_runner(thread, tmpfile, qin, qout, stop_timeout)
return addr, stop return addr, stop
......
...@@ -2,12 +2,10 @@ You can change the address(es) of a client storaage. ...@@ -2,12 +2,10 @@ You can change the address(es) of a client storaage.
We'll start by setting up a server and connecting to it: We'll start by setting up a server and connecting to it:
>>> import ZEO, ZEO.StorageServer, ZODB.FileStorage, transaction >>> import ZEO, transaction
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> conn = ZEO.connection(server.addr, max_disconnect_poll=0.1) >>> addr, stop = ZEO.server(path='test.fs')
>>> conn = ZEO.connection(addr)
>>> client = conn.db().storage >>> client = conn.db().storage
>>> client.is_connected() >>> client.is_connected()
True True
...@@ -18,30 +16,25 @@ We'll start by setting up a server and connecting to it: ...@@ -18,30 +16,25 @@ We'll start by setting up a server and connecting to it:
Now we'll close the server: Now we'll close the server:
>>> server.close() >>> stop()
And wait for the connectin to notice it's disconnected: And wait for the connectin to notice it's disconnected:
>>> wait_until(lambda : not client.is_connected()) >>> wait_until(lambda : not client.is_connected())
Now, we'll restart the server and update the connection: Now, we'll restart the server:
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> client.new_addr(server.addr)
>>> addr, stop = ZEO.server(path='test.fs')
Update with another client: Update with another client:
>>> conn2 = ZEO.connection(server.addr) >>> conn2 = ZEO.connection(addr)
>>> conn2.root.x += 1 >>> conn2.root.x += 1
>>> transaction.commit() >>> transaction.commit()
Wait for connect: Update the connection and wait for connect:
>>> client.new_addr(addr)
>>> wait_until(lambda : client.is_connected()) >>> wait_until(lambda : client.is_connected())
>>> _ = transaction.begin() >>> _ = transaction.begin()
>>> conn.root() >>> conn.root()
...@@ -51,4 +44,4 @@ Wait for connect: ...@@ -51,4 +44,4 @@ Wait for connect:
>>> conn.close() >>> conn.close()
>>> conn2.close() >>> conn2.close()
>>> server.close() >>> stop()
-----BEGIN CERTIFICATE-----
MIID/TCCAuWgAwIBAgIJAKpYWt7G+3R4MA0GCSqGSIb3DQEBBQUAMFwxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9k
Yi5vcmcxHjAcBgkqhkiG9w0BCQEWD3NlcnZlckB6b2RiLm9yZzAeFw0xNjA2MjMx
NTA5MTZaFw0xNzA2MjMxNTA5MTZaMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJW
QTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0B
CQEWD3NlcnZlckB6b2RiLm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBANqyMazB39wFKFh2nIspOw+e/7LPLQZpsd6BnYOxPbdlhb23Q930WqgW5qCA
aJjLqkmvQvZhElXdO2lOxGLR2/Cu71UgmYXkZbMKqPNtoYCPRPCCKm5EczAFXTy4
SUD40wAlndP7J8TjpIaKZjgdfy4GWnGQQAbXUR1eFZszQtU7pUvyjgXsY+BEgq4h
F4iIBarcf8k/6PTldTRLxEbRiTNZ4cdIEtTJL/LzSu5oBw8Z3J05aPg9DcWVl89P
XtemKxU8+Vm847xXJkuODkpSgOCqAPn959b/DGdn1fyx2CR0lSgC4n2rYE/oWxw0
hiyI1jeXTAfOtqvcg4XA5Cs3ivUCAwEAAaOBwTCBvjAdBgNVHQ4EFgQUoWgKmGKI
o2U9vFK48cyXuZrmPdgwgY4GA1UdIwSBhjCBg4AUoWgKmGKIo2U9vFK48cyXuZrm
PdihYKReMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9E
QjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0BCQEWD3NlcnZlckB6b2Ri
Lm9yZ4IJAKpYWt7G+3R4MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
ACiGzPx1445EdyO4Em3M6MwPQD8KVLvxsYE5HQMk0A2BTJOs2Vzf0NPL2hn947D/
wFe/ubkJXmzPG2CBCbNA//exWj8x2rR8256qJuWDwFx0WUFpRmEVFUGK+bpvgxQV
DlLxnWV8z6Tq9vINRjKT3mcBX4OpDgbEL4O92pbJ7kZNn4Z2+v6/lsWzg3Eo6LVY
fj902gQD/6hjVenD6J5Dqftj4x9nsKjdMz8n5Ok5E1J02ghiWjlUp1PNUKwc2Elw
oFnjPiacbko0fSnD9Zf6qBbACAYyBkHvBc1ZMebnGepZn3E6V91X5kZl84hGzgsb
C+2aGtAqSnvL4/DlIyss3hc=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA2rIxrMHf3AUoWHaciyk7D57/ss8tBmmx3oGdg7E9t2WFvbdD
3fRaqBbmoIBomMuqSa9C9mESVd07aU7EYtHb8K7vVSCZheRlswqo822hgI9E8IIq
bkRzMAVdPLhJQPjTACWd0/snxOOkhopmOB1/LgZacZBABtdRHV4VmzNC1TulS/KO
Bexj4ESCriEXiIgFqtx/yT/o9OV1NEvERtGJM1nhx0gS1Mkv8vNK7mgHDxncnTlo
+D0NxZWXz09e16YrFTz5WbzjvFcmS44OSlKA4KoA+f3n1v8MZ2fV/LHYJHSVKALi
fatgT+hbHDSGLIjWN5dMB862q9yDhcDkKzeK9QIDAQABAoIBACtwA0/V/jm8SIQx
ouw9Fz8GDLGeVsoUSkDwq7GRjbmUj5jcAr3eH/eM/OfaOWxH353dEsbPBw5I79j9
zSH3nuDSTjUxUWz3rX9/WYloOBDJ5B6FLBpUvDBIkHlT/TDLe1VnI08Mbpy7vlz+
tkjlCvLATkyKIz14nOLhYhc+ekLRxQZrVRgHIPW13c0F61drkc4uCs/UMbYRzZiZ
nnMeQLghIUP13xMtMMkNx0P1ampvV18kCJWuvHUqOMnCaPnTGCnGfOUtq98sTdui
vBnleePmBzF5VGT7M3e3pr63EKyVPD/bx2dbItcxOahBTgDbMQR4AQ65NvD5+B0+
d9XbfgECgYEA7wLsMvFAmj5IrXYGYnaqy1LsMiZll9zv4QB2tAKLSIy5z0Cns54R
ttI6Ni94CfjBOKmR5IZXjgXZg5ydu+tBLwqNMJISziFSyTxzwNsAEPczUHefFhwq
zhAkIVsISy42AxpuyHlz5EBSJiUULhguUhmIDxCbQnmlSK6X9MxYO3UCgYEA6j2c
dfU4RC2tzjNKoKs52mUoWmwFsM5CKs0hDeRn2KV80nNKn/mRa0cZ36u/x0hN3fBm
wrwNiZWOX+ot5SfnePx0dOiTOxfWYeXtzVqF6KhPUK8K5424zL+N1uw941gvkXYR
Cc79AxDI/S5y/2ZR8aW4H91LdCcvPlE4Dp1JoYECgYAuE2gpYezMT1l/ZxNQBARk
8fVqrZBEOGld/NLlXOAw+kAPvi0WKVDM57YlH/2KHpRRMg9X+LYEQQhvoM+fnHiS
cvxI8sABUNc+yBKgiRd4Lc+MoaLfhkqSMvZkH8J3i88Jxhy5NQCsbeHoTJmZUTwM
w7NBBDiKFh1Q56ePn50ayQKBgQCA8guoT6Z6uZ6dDVU+nyOI2vjc1exICTMZdrSE
fkDAXVEaVMc2y17G7GwM2fIHlQDwdP9ModLd80td937uUAo3atn85W7vL88fM0C2
M+fVTJnk84cQMs8RPz2om4HyHcCJ1bHJcX2Ma3gJD8HUYJIpcS2rtNlthoiWSIWQ
XfuDgQKBgQCYW+x52XSaTjE0SviYmeNuO30bELgjxPyPLge407EAp1h6Sr9Za5ap
2aQsClsuzpHcHDoWmzO+dTr70Qyt/YoY5FN3uZRG9m7X/OH7+ceBEU/BtQ0Whem4
YNE5lpuw4eGsJQGkhRFcqKy66gId7Sw5b6CIaJqbhQCogkWt5JQuWQ==
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIID8DCCAtigAwIBAgIJALop9P9MBfzLMA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9k
Yi5vcmcxGjAYBgkqhkiG9w0BCQEWC3B3QHpvZGIub3JnMB4XDTE2MDYyMzE1MTAz
MVoXDTE3MDYyMzE1MTAzMVowWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAlZBMQ0w
CwYDVQQKEwRaT0RCMREwDwYDVQQDEwh6b2RiLm9yZzEaMBgGCSqGSIb3DQEJARYL
cHdAem9kYi5vcmcwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDKw/iw
N1EPddU9QQQ+OnCJv9G3rbTOPt4zEbpfTROIHTME3krFKPALrGF2aK+oBpHx3/TZ
HN5UvWK/jmGtDL9jekKCAaeAaVIKlESUS6DIxZY+FaO3re/1fbmBNRz8Cnn1raAw
/4YZRDPvblooH4Nt5m7uooGAIIDPft3fInhmGboOoIpXc7nMGVGOWXlDN5I9oFmm
4vby4CUMy3A/0wnHgTuMNy7Tpjgz2E/1MRAOyWQ7PZYiASs4ycZfas8058O8DI+o
rSYyum/czecIz52P6jbx5LWvcKDWac8QbJoHPelthYtxcMHee2+Nh6MWW688CBzq
HSeFAdNO3d9kMiFpAgMBAAGjgbwwgbkwHQYDVR0OBBYEFDui1OC2+2z2rHADglk5
tGOndxhoMIGJBgNVHSMEgYEwf4AUO6LU4Lb7bPascAOCWTm0Y6d3GGihXKRaMFgx
CzAJBgNVBAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UE
AxMIem9kYi5vcmcxGjAYBgkqhkiG9w0BCQEWC3B3QHpvZGIub3JnggkAuin0/0wF
/MswDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAiEYO8MZ3OG8sqy9t
AtUZbv0aTsIUzy/QTUKKDUo8qwKNOylqyqGAZV0tZ5eCoqIGFAwRJBBymIizU3zH
U1k2MnYZMVi7uYwSy+qwg52+X7GLl/kaAfx8kNvpr274CuZQnLojJS+K8HtH5Pom
YD3gTO3OxGS4IS6uf6DD+mf+C9OBnTl47P0HA0/eHBEXVSc2vsv30H/UoW5VbZ6z
6TWkoPwSMVhCNRRRif4/eqCLh24/h5b4uvAC+tsrIPQ9If7EsqVTNMCbAkv3ib6g
OmaCdbrGkqvD3UVn7i5ci96UZoF80EWNZiwhMdvQtMfOAR4jHQ1pTepJni6JwzZP
UMNDpQ==
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: DES-EDE3-CBC,769B900D03925712
z5M/XkqEC1+PxJ1T3QrUhGG9fPTBsPxKy8WlwIytbMg0RXS0oJBiFgNYp1ktqzGo
yT+AdTCRR1hNVX8M5HbV3ksUjKxXKCL3+yaaB6JtGbNRr2qTNwosvxD92nKT/hvN
R6rHF6LcO05s8ubs9b9ON/ja7HCx69N5CjBuCbCFHUTlAXkwD9w0ScrxrtfP50EY
FOw6LAqhhzq6/KO7c1SJ7k9LYzakhL+nbw5KM9QgBk4WHlmKLbCZIZ5RWvu0F4s5
n4qk/BcuXIkbYuEv2kH0nDk5eDfA/dj7xZcMMgL5VFymQzaZLYyj4WuQYXu/7JW/
nM/ZWBkZOMaI3vnPTG1hJ9pgjLjQnjfNA/bGWwbLxjCsPmR8yvZS4v2iqdB6X3Vl
yJ9aV9r8KoU0PJk3x4v2Zp+RQxgrKSaQw59sXptaXAY3NCRR6ohvq4P5X6UB8r5S
vYdoMeVXhX1hzXeMguln7zQInwJhPZqk4wMIV3lTsCqh1eJ7NC2TGCwble+B/ClR
KtzuJBzoYPLw44ltnXPEMmVH1Fkh17+QZFgZRJrKGD9PGOAXmnzudsZ1xX9kNnOM
JLIT/mzKcqkd8S1n1Xi1x7zGA0Ng5xmKGFe8oDokPJucJO1Ou+hbLDmC0DZUGzr1
qqPJ3F/DzZZDTmD/rZF6doPJgFAZvgpVeiWS8/v1qbz/nz13uwXDLjRPgLfcKpmQ
4R3V4QlgviDilW61VTZnzV9qAOx4fG6+IwWIGBlrJnfsH/fSCDNlAStc6k12zdun
PIIRJBfbEprGig3vRWUoBASReqow1JCN9DaVCX1P27pDKY5oDe+7/HOrQpwhPoya
2HEwbKeyY0nCcCXbkWGL1bwEUs/PrJv+61rik4KxOWhKpHWkZLzbozELb44jXrJx
e8K8XKz4La2DEjsUYHc31u6T69GBQO9JDEvih15phUWq8ITvDnkHpAg+wYb1JAHD
QcqDtAulMvT/ZGN0h7qdwbHMggEsLgCCVPG4iZ5K4cXsMbePFvQqq+o4FTMF+cM5
2Dq0wir92U9cH+ooy80LIt5Kp5zqgQZzr73o9MEgwqJocCrx9ZrofKRUmTV+ZU0r
w5mfUM47Ctnqia0UNGx6SUs3CHFDPWPbzrAaqGzSvFhzR1MMoL1/rJzP1VSm3Fk3
ESWkPrg0J8dcQP/ch9MhH8eoQYyA+2q1vClUbeZLAs5KoHxgi6pSkGYqFhshrA+t
2AIrUPDPPDf0PgRoXJrzdVOiNNY1rzyql+0JqDH6DjCVcAADWY+48p9U2YFTd7Je
DvnZWihwe0qYGn1AKIkvJ4SR3bQg36etrxhMrMl/8lUn2dnT7GFrhjr9HwCpJwa7
8tv150SrQXt3FXZCHb+RMUgoWZDeksDohPiGzXkPU6kaSviZVnRMslyU4ahWp6vC
8tYUhb7K6N+is1hYkICNt6zLl2vBDuCDWmiIwopHtnH1kz8bYlp4/GBVaMIgZiCM
gM/7+p4YCc++s2sJiQ9+BqPo0zKm3bbSP+fPpeWefQVte9Jx4S36YXU52HsJxBTN
WUdHABC+aS2A45I12xMNzOJR6VfxnG6f3JLpt3MkUCEg+898vJGope+TJUhD+aJC
-----END RSA PRIVATE KEY-----
...@@ -12,53 +12,108 @@ ...@@ -12,53 +12,108 @@
# #
############################################################################## ##############################################################################
import doctest
import tempfile
import unittest import unittest
import ZODB.config
import ZODB.tests.util from zope.testing import setupstack
from ZODB.tests.testConfig import ConfigTestBase from ZODB.config import storageFromString
class ZEOConfigTest(ConfigTestBase): from .forker import start_zeo_server
def test_zeo_config(self): class ZEOConfigTest(setupstack.TestCase):
# We're looking for a port that doesn't exist so a
# connection attempt will fail. Instead of elaborate setUp = setupstack.setUpDirectory
# logic to loop over a port calculation, we'll just pick a
# simple "random", likely to not-exist port number and add def start_server(self, settings='', **kw):
# an elaborate comment explaining this instead. Go ahead,
# grep for 9. for name, value in kw.items():
from ZEO.ClientStorage import ClientDisconnected settings += '\n%s %s\n' % (name.replace('_', '-'), value)
import ZConfig
from ZODB.config import getDbSchema zeo_conf = """
from ZEO._compat import StringIO <zeo>
cfg = """ address 127.0.0.1:0
<zodb> %s
<zeoclient> </zeo>
server localhost:56897 """ % settings
wait false return start_zeo_server("<mappingstorage>\n</mappingstorage>\n",
</zeoclient> zeo_conf, threaded=True)
</zodb>
""" def start_client(self, addr, settings='', **kw):
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg)) settings += '\nserver %s:%s\n' % addr
self.assertEqual(config.database[0].config.storage.config.blob_dir, for name, value in kw.items():
None) settings += '\n%s %s\n' % (name.replace('_', '-'), value)
self.assertRaises(ClientDisconnected, self._test, cfg) return storageFromString(
"""
cfg = """ %import ZEO
<zodb>
<zeoclient> <clientstorage>
blob-dir blobs {}
server localhost:56897 </clientstorage>
wait false """.format(settings))
</zeoclient>
</zodb> def _client_assertions(
""" self, client, addr,
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg)) connected=True,
self.assertEqual(config.database[0].config.storage.config.blob_dir, cache_size=20 * (1<<20),
'blobs') cache_path=None,
self.assertRaises(ClientDisconnected, self._test, cfg) blob_dir=None,
shared_blob_dir=False,
blob_cache_size=None,
blob_cache_size_check=10,
read_only=False,
read_only_fallback=False,
wait_timeout=30,
client_label=None,
storage='1',
name=None,
):
self.assertEqual(client.is_connected(), connected)
self.assertEqual(client._addr, [addr])
self.assertEqual(client._cache.maxsize, cache_size)
self.assertEqual(client._cache.path, cache_path)
self.assertEqual(client.blob_dir, blob_dir)
self.assertEqual(client.shared_blob_dir, shared_blob_dir)
self.assertEqual(client._blob_cache_size, blob_cache_size)
if blob_cache_size:
self.assertEqual(client._blob_cache_size_check,
blob_cache_size * blob_cache_size_check // 100)
self.assertEqual(client._is_read_only, read_only)
self.assertEqual(client._read_only_fallback, read_only_fallback)
self.assertEqual(client._server.timeout, wait_timeout)
self.assertEqual(client._client_label, client_label)
self.assertEqual(client._storage, storage)
self.assertEqual(client.__name__,
name if name is not None else str(client._addr))
def test_default_zeo_config(self, **client_settings):
addr, stop = self.start_server()
client = self.start_client(addr, **client_settings)
self._client_assertions(client, addr, **client_settings)
client.close()
stop()
def test_client_variations(self):
for name, value in dict(
cache_size=4200,
cache_path='test',
blob_dir='blobs',
blob_cache_size=424242,
read_only=True,
read_only_fallback=True,
wait_timeout=33,
client_label='test_client',
name='Test'
).items():
params = {name: value}
self.test_default_zeo_config(**params)
def test_blob_cache_size_check(self):
self.test_default_zeo_config(blob_cache_size=424242,
blob_cache_size_check=50)
def test_suite(): def test_suite():
return unittest.makeSuite(ZEOConfigTest) return unittest.makeSuite(ZEOConfigTest)
...@@ -79,6 +79,12 @@ class MappingStorageConnectionTests( ...@@ -79,6 +79,12 @@ class MappingStorageConnectionTests(
): ):
"""Mapping storage connection tests.""" """Mapping storage connection tests."""
class SSLConnectionTests(
MappingStorageConfig,
ConnectionTests.SSLConnectionTests,
):
pass
# The ReconnectionTests can't work with MappingStorage because it's only an # The ReconnectionTests can't work with MappingStorage because it's only an
# in-memory storage and has no persistent state. # in-memory storage and has no persistent state.
...@@ -88,6 +94,12 @@ class MappingStorageTimeoutTests( ...@@ -88,6 +94,12 @@ class MappingStorageTimeoutTests(
): ):
pass pass
class SSLConnectionTests(
MappingStorageConfig,
ConnectionTests.SSLConnectionTests,
):
pass
test_classes = [FileStorageConnectionTests, test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests, FileStorageReconnectionTests,
...@@ -95,6 +107,7 @@ test_classes = [FileStorageConnectionTests, ...@@ -95,6 +107,7 @@ test_classes = [FileStorageConnectionTests,
FileStorageTimeoutTests, FileStorageTimeoutTests,
MappingStorageConnectionTests, MappingStorageConnectionTests,
MappingStorageTimeoutTests, MappingStorageTimeoutTests,
SSLConnectionTests,
] ]
def invalidations_while_connecting(): def invalidations_while_connecting():
......
...@@ -39,6 +39,7 @@ import re ...@@ -39,6 +39,7 @@ import re
import shutil import shutil
import signal import signal
import stat import stat
import ssl
import sys import sys
import tempfile import tempfile
import threading import threading
...@@ -55,6 +56,8 @@ import ZODB.tests.util ...@@ -55,6 +56,8 @@ import ZODB.tests.util
import ZODB.utils import ZODB.utils
import zope.testing.setupstack import zope.testing.setupstack
from . import testssl
logger = logging.getLogger('ZEO.tests.testZEO') logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB: class DummyDB:
...@@ -99,7 +102,7 @@ class MiscZEOTests: ...@@ -99,7 +102,7 @@ class MiscZEOTests:
def checkZEOInvalidation(self): def checkZEOInvalidation(self):
addr = self._storage._addr addr = self._storage._addr
storage2 = self._wrap_client( storage2 = self._wrap_client(
ClientStorage(addr, wait=1, min_disconnect_poll=0.1)) ClientStorage(addr, wait=1, **self._client_options()))
try: try:
oid = self._storage.new_oid() oid = self._storage.new_oid()
ob = MinPO('first') ob = MinPO('first')
...@@ -128,13 +131,13 @@ class MiscZEOTests: ...@@ -128,13 +131,13 @@ class MiscZEOTests:
# Earlier, a ClientStorage would not have the last transaction id # Earlier, a ClientStorage would not have the last transaction id
# available right after successful connection, this is required now. # available right after successful connection, this is required now.
addr = self._storage._addr addr = self._storage._addr
storage2 = ClientStorage(addr) storage2 = ClientStorage(addr, **self._client_options())
self.assert_(storage2.is_connected()) self.assert_(storage2.is_connected())
self.assertEquals(ZODB.utils.z64, storage2.lastTransaction()) self.assertEquals(ZODB.utils.z64, storage2.lastTransaction())
storage2.close() storage2.close()
self._dostore() self._dostore()
storage3 = ClientStorage(addr) storage3 = ClientStorage(addr, **self._client_options())
self.assert_(storage3.is_connected()) self.assert_(storage3.is_connected())
self.assertEquals(8, len(storage3.lastTransaction())) self.assertEquals(8, len(storage3.lastTransaction()))
self.assertNotEquals(ZODB.utils.z64, storage3.lastTransaction()) self.assertNotEquals(ZODB.utils.z64, storage3.lastTransaction())
...@@ -164,26 +167,33 @@ class GenericTests( ...@@ -164,26 +167,33 @@ class GenericTests(
def setUp(self): def setUp(self):
StorageTestBase.StorageTestBase.setUp(self) StorageTestBase.StorageTestBase.setUp(self)
logger.info("setUp() %s", self.id()) logger.info("setUp() %s", self.id())
port = get_port(self)
zconf = forker.ZEOConfig(('', port))
zport, stop = forker.start_zeo_server(self.getConfig(), zport, stop = forker.start_zeo_server(self.getConfig(),
zconf, port) self.getZEOConfig())
self._servers = [stop] self._servers = [stop]
if not self.blob_cache_dir: if not self.blob_cache_dir:
# This is the blob cache for ClientStorage # This is the blob cache for ClientStorage
self.blob_cache_dir = tempfile.mkdtemp( self.blob_cache_dir = tempfile.mkdtemp(
'blob_cache', 'blob_cache',
dir=os.path.abspath(os.getcwd())) dir=os.path.abspath(os.getcwd()))
self._storage = self._wrap_client(ClientStorage( self._storage = self._wrap_client(
zport, '1', cache_size=20000000, ClientStorage(
min_disconnect_poll=0.5, wait=1, zport, '1', cache_size=20000000,
wait_timeout=60, blob_dir=self.blob_cache_dir, min_disconnect_poll=0.5, wait=1,
shared_blob_dir=self.shared_blob_dir)) wait_timeout=60, blob_dir=self.blob_cache_dir,
shared_blob_dir=self.shared_blob_dir,
**self._client_options()),
)
self._storage.registerDB(DummyDB()) self._storage.registerDB(DummyDB())
def getZEOConfig(self):
return forker.ZEOConfig(('', get_port(self)))
def _wrap_client(self, client): def _wrap_client(self, client):
return client return client
def _client_options(self):
return {}
def tearDown(self): def tearDown(self):
self._storage.close() self._storage.close()
for stop in self._servers: for stop in self._servers:
...@@ -204,7 +214,8 @@ class GenericTests( ...@@ -204,7 +214,8 @@ class GenericTests(
# cleaner way. # cleaner way.
addr = self._storage._addr addr = self._storage._addr
self._storage.close() self._storage.close()
self._storage = ClientStorage(addr, read_only=read_only, wait=1) self._storage = ClientStorage(
addr, read_only=read_only, wait=1, **self._client_options())
def checkWriteMethods(self): def checkWriteMethods(self):
# ReadOnlyStorage defines checkWriteMethods. The decision # ReadOnlyStorage defines checkWriteMethods. The decision
...@@ -223,7 +234,8 @@ class GenericTests( ...@@ -223,7 +234,8 @@ class GenericTests(
def _do_store_in_separate_thread(self, oid, revid, voted): def _do_store_in_separate_thread(self, oid, revid, voted):
def do_store(): def do_store():
store = ZEO.ClientStorage.ClientStorage(self._storage._addr) store = ZEO.ClientStorage.ClientStorage(
self._storage._addr, **self._client_options())
try: try:
t = transaction.get() t = transaction.get()
store.tpc_begin(t) store.tpc_begin(t)
...@@ -335,6 +347,16 @@ class FileStorageTests(FullGenericTests): ...@@ -335,6 +347,16 @@ class FileStorageTests(FullGenericTests):
self._storage._info['interfaces'] self._storage._info['interfaces']
) )
class FileStorageSSLTests(FileStorageTests):
def getZEOConfig(self):
return testssl.server_config
def _client_options(self):
return {'ssl': testssl.client_ssl()}
class FileStorageHexTests(FileStorageTests): class FileStorageHexTests(FileStorageTests):
_expected_interfaces = ( _expected_interfaces = (
('ZODB.interfaces', 'IStorageRestoreable'), ('ZODB.interfaces', 'IStorageRestoreable'),
...@@ -1066,7 +1088,7 @@ def runzeo_without_configfile(): ...@@ -1066,7 +1088,7 @@ def runzeo_without_configfile():
------ ------
--T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt --T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt
------ ------
--T INFO ZEO.acceptor listening on ... --T INFO ZEO.asyncio... listening on ...
------ ------
--T INFO ZEO.StorageServer closing storage '1' --T INFO ZEO.StorageServer closing storage '1'
testing exit immediately testing exit immediately
...@@ -1447,7 +1469,8 @@ def quick_close_doesnt_kill_server(): ...@@ -1447,7 +1469,8 @@ def quick_close_doesnt_kill_server():
Start a server: Start a server:
>>> addr, _ = start_server() >>> from .testssl import server_config, client_ssl
>>> addr, _ = start_server(zeo_conf=server_config)
Now connect and immediately disconnect. This caused the server to Now connect and immediately disconnect. This caused the server to
die in the past: die in the past:
...@@ -1460,9 +1483,12 @@ def quick_close_doesnt_kill_server(): ...@@ -1460,9 +1483,12 @@ def quick_close_doesnt_kill_server():
... s.connect(addr) ... s.connect(addr)
... s.close() ... s.close()
>>> print("\n\nXXX WARNING: running quick_close_doesnt_kill_server with ssl as hack pending http://bugs.python.org/issue27386\n", file=sys.stderr) # Intentional long line to be annoying till this is fixed
Now we should be able to connect as normal: Now we should be able to connect as normal:
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr, ssl=client_ssl())
>>> db.storage.is_connected() >>> db.storage.is_connected()
True True
...@@ -1485,7 +1511,8 @@ def can_use_empty_string_for_local_host_on_client(): ...@@ -1485,7 +1511,8 @@ def can_use_empty_string_for_local_host_on_client():
slow_test_classes = [ slow_test_classes = [
#BlobAdaptedFileStorageTests, BlobWritableCacheTests, #BlobAdaptedFileStorageTests, BlobWritableCacheTests,
MappingStorageTests, DemoStorageTests, MappingStorageTests, DemoStorageTests,
FileStorageTests, FileStorageHexTests, FileStorageClientHexTests, FileStorageTests, FileStorageSSLTests,
FileStorageHexTests, FileStorageClientHexTests,
] ]
quick_test_classes = [FileStorageRecoveryTests, ZRPCConnectionTests] quick_test_classes = [FileStorageRecoveryTests, ZRPCConnectionTests]
...@@ -1538,8 +1565,6 @@ class ServerManagingClientStorageForIExternalGCTest( ...@@ -1538,8 +1565,6 @@ class ServerManagingClientStorageForIExternalGCTest(
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
# Collect misc tests into their own layer to reduce size of
# unit test layer
zeo = unittest.TestSuite() zeo = unittest.TestSuite()
zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack)) zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack))
patterns = [ patterns = [
...@@ -1570,12 +1595,16 @@ def test_suite(): ...@@ -1570,12 +1595,16 @@ def test_suite():
"ClientDisconnected"), "ClientDisconnected"),
)), )),
)) ))
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc')
suite.addTest(zeo)
zeo = unittest.TestSuite()
zeo.addTest( zeo.addTest(
doctest.DocFileSuite( doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test', 'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt', 'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
'dynamic_server_ports.test', 'new_addr.test', '../nagios.rst', 'dynamic_server_ports.test', '../nagios.rst',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns), checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function}, globs={'print_function': print_function},
...@@ -1588,9 +1617,23 @@ def test_suite(): ...@@ -1588,9 +1617,23 @@ def test_suite():
)) ))
for klass in quick_test_classes: for klass in quick_test_classes:
zeo.addTest(unittest.makeSuite(klass, "check")) zeo.addTest(unittest.makeSuite(klass, "check"))
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc') zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc2')
suite.addTest(zeo) suite.addTest(zeo)
# tests that often fail, maybe if they have their own layers
for name in 'zeo-fan-out.test', 'new_addr.test':
zeo = unittest.TestSuite()
zeo.addTest(
doctest.DocFileSuite(
name,
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
),
)
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-' + name)
suite.addTest(zeo)
suite.addTest(unittest.makeSuite(MultiprocessingTests)) suite.addTest(unittest.makeSuite(MultiprocessingTests))
# Put the heavyweights in their own layers # Put the heavyweights in their own layers
......
import mock
import os
import ssl
import unittest
from ZODB.config import storageFromString
from ..Exceptions import ClientDisconnected
from .. import runzeo
from .testConfig import ZEOConfigTest
here = os.path.dirname(__file__)
server_cert = os.path.join(here, 'server.pem')
server_key = os.path.join(here, 'server_key.pem')
serverpw_cert = os.path.join(here, 'serverpw.pem')
serverpw_key = os.path.join(here, 'serverpw_key.pem')
client_cert = os.path.join(here, 'client.pem')
client_key = os.path.join(here, 'client_key.pem')
class SSLConfigTest(ZEOConfigTest):
def test_ssl_basic(self):
# This shows that configuring ssl has an actual effect on connections.
# Other SSL configuration tests will be Mockiavellian.
# Also test that an SSL connection mismatch doesn't kill
# the server loop.
# An SSL client can't talk to a non-SSL server:
addr, stop = self.start_server()
with self.assertRaises(ClientDisconnected):
self.start_client(
addr,
"""<ssl>
certificate {}
key {}
</ssl>""".format(client_cert, client_key), wait_timeout=1)
# But a non-ssl one can:
client = self.start_client(addr)
self._client_assertions(client, addr)
client.close()
stop()
# A non-SSL client can't talk to an SSL server:
addr, stop = self.start_server(
"""<ssl>
certificate {}
key {}
authenticate {}
</ssl>""".format(server_cert, server_key, client_cert)
)
with self.assertRaises(ClientDisconnected):
self.start_client(addr, wait_timeout=1)
# But an SSL one can:
client = self.start_client(
addr,
"""<ssl>
certificate {}
key {}
authenticate {}
server-hostname zodb.org
</ssl>""".format(client_cert, client_key, server_cert))
self._client_assertions(client, addr)
client.close()
stop()
def test_ssl_hostname_check(self):
addr, stop = self.start_server(
"""<ssl>
certificate {}
key {}
authenticate {}
</ssl>""".format(server_cert, server_key, client_cert)
)
# Connext with bad hostname fails:
with self.assertRaises(ClientDisconnected):
client = self.start_client(
addr,
"""<ssl>
certificate {}
key {}
authenticate {}
server-hostname example.org
</ssl>""".format(client_cert, client_key, server_cert),
wait_timeout=1)
# Connext with good hostname succeeds:
client = self.start_client(
addr,
"""<ssl>
certificate {}
key {}
authenticate {}
server-hostname zodb.org
</ssl>""".format(client_cert, client_key, server_cert))
self._client_assertions(client, addr)
client.close()
stop()
def test_ssl_pw(self):
addr, stop = self.start_server(
"""<ssl>
certificate {}
key {}
authenticate {}
password-function ZEO.tests.testssl.pwfunc
</ssl>""".format(serverpw_cert, serverpw_key, client_cert)
)
stop()
@mock.patch('ssl.create_default_context')
def test_ssl_mockiavellian_server_no_ssl(self, factory):
server = create_server()
self.assertFalse(factory.called)
self.assertEqual(server.acceptor.ssl_context, None)
server.close()
def assert_context(
self, factory, context,
cert=(server_cert, server_key, None),
verify_mode=ssl.CERT_REQUIRED,
check_hostname=False,
cafile=None, capath=None,
):
factory.assert_called_with(
ssl.Purpose.CLIENT_AUTH, cafile=cafile, capath=capath)
context.load_cert_chain.assert_called_with(*cert)
self.assertEqual(context, factory.return_value)
self.assertEqual(context.verify_mode, verify_mode)
self.assertEqual(context.check_hostname, check_hostname)
@mock.patch('ssl.create_default_context')
def test_ssl_mockiavellian_server_ssl_no_auth(self, factory):
with self.assertRaises(SystemExit):
# auth is required
create_server(certificate=server_cert, key=server_key)
@mock.patch('ssl.create_default_context')
def test_ssl_mockiavellian_server_ssl_auth_file(self, factory):
server = create_server(
certificate=server_cert, key=server_key, authenticate=__file__)
context = server.acceptor.ssl_context
self.assert_context(factory, context, cafile=__file__)
server.close()
@mock.patch('ssl.create_default_context')
def test_ssl_mockiavellian_server_ssl_auth_dir(self, factory):
server = create_server(
certificate=server_cert, key=server_key, authenticate=here)
context = server.acceptor.ssl_context
self.assert_context(factory, context, capath=here)
server.close()
@mock.patch('ssl.create_default_context')
def test_ssl_mockiavellian_server_ssl_pw(self, factory):
server = create_server(
certificate=server_cert,
key=server_key,
password_function='ZEO.tests.testssl.pwfunc',
authenticate=here,
)
context = server.acceptor.ssl_context
self.assert_context(
factory, context, (server_cert, server_key, pwfunc), capath=here)
server.close()
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_no_ssl(self, ClientStorage, factory):
client = ssl_client()
self.assertFalse('ssl' in ClientStorage.call_args[1])
self.assertFalse('ssl_server_hostname' in ClientStorage.call_args[1])
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_server_signed(
self, ClientStorage, factory
):
client = ssl_client(certificate=client_cert, key=client_key)
context = ClientStorage.call_args[1]['ssl']
self.assertEqual(ClientStorage.call_args[1]['ssl_server_hostname'],
None)
self.assert_context(
factory, context, (client_cert, client_key, None),
check_hostname=True)
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_auth_dir(
self, ClientStorage, factory
):
client = ssl_client(
certificate=client_cert, key=client_key, authenticate=here)
context = ClientStorage.call_args[1]['ssl']
self.assertEqual(ClientStorage.call_args[1]['ssl_server_hostname'],
None)
self.assert_context(
factory, context, (client_cert, client_key, None),
capath=here,
check_hostname=True,
)
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_auth_file(
self, ClientStorage, factory
):
client = ssl_client(
certificate=client_cert, key=client_key, authenticate=server_cert)
context = ClientStorage.call_args[1]['ssl']
self.assertEqual(ClientStorage.call_args[1]['ssl_server_hostname'],
None)
self.assert_context(
factory, context, (client_cert, client_key, None),
cafile=server_cert,
check_hostname=True,
)
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_pw(
self, ClientStorage, factory
):
client = ssl_client(
certificate=client_cert, key=client_key,
password_function='ZEO.tests.testssl.pwfunc',
authenticate=server_cert)
context = ClientStorage.call_args[1]['ssl']
self.assertEqual(ClientStorage.call_args[1]['ssl_server_hostname'],
None)
self.assert_context(
factory, context, (client_cert, client_key, pwfunc),
cafile=server_cert,
check_hostname=True,
)
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_server_hostname(
self, ClientStorage, factory
):
client = ssl_client(
certificate=client_cert, key=client_key, authenticate=server_cert,
server_hostname='example.com')
context = ClientStorage.call_args[1]['ssl']
self.assertEqual(ClientStorage.call_args[1]['ssl_server_hostname'],
'example.com')
self.assert_context(
factory, context, (client_cert, client_key, None),
cafile=server_cert,
check_hostname=True,
)
@mock.patch('ssl.create_default_context')
@mock.patch('ZEO.ClientStorage.ClientStorage')
def test_ssl_mockiavellian_client_check_hostname(
self, ClientStorage, factory
):
client = ssl_client(
certificate=client_cert, key=client_key, authenticate=server_cert,
check_hostname=False)
context = ClientStorage.call_args[1]['ssl']
self.assertEqual(ClientStorage.call_args[1]['ssl_server_hostname'],
None)
self.assert_context(
factory, context, (client_cert, client_key, None),
cafile=server_cert,
check_hostname=False,
)
def args(*a, **kw):
return a, kw
def ssl_conf(**ssl_settings):
if ssl_settings:
ssl_conf = '<ssl>\n' + '\n'.join(
'{} {}'.format(name.replace('_', '-'), value)
for name, value in ssl_settings.items()
) + '\n</ssl>\n'
else:
ssl_conf = ''
return ssl_conf
def ssl_client(**ssl_settings):
return storageFromString(
"""%import ZEO
<clientstorage>
server localhost:0
{}
</clientstorage>
""".format(ssl_conf(**ssl_settings))
)
def create_server(**ssl_settings):
with open('conf', 'w') as f:
f.write(
"""
<zeo>
address localhost:0
{}
</zeo>
<mappingstorage>
</mappingstorage>
""".format(ssl_conf(**ssl_settings)))
options = runzeo.ZEOOptions()
options.realize(['-C', 'conf'])
s = runzeo.ZEOServer(options)
s.open_storages()
s.create_server()
return s.server
pwfunc = lambda : '1234'
def test_suite():
return unittest.makeSuite(SSLConfigTest)
# Helpers for other tests:
server_config = """
<zeo>
address 127.0.0.1:0
<ssl>
certificate {}
key {}
authenticate {}
</ssl>
</zeo>
""".format(server_cert, server_key, client_cert)
def client_ssl():
context = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH, cafile=server_cert)
context.load_cert_chain(client_cert, client_key)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = False
return context
"""SSL configuration support
"""
import os
import sys
def ssl_config(section, server):
import ssl
cafile = capath = None
auth = section.authenticate
if auth:
if os.path.isdir(auth):
capath=auth
else:
cafile=auth
context = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH, cafile=cafile, capath=capath)
if section.certificate:
password = section.password_function
if password:
module, name = password.rsplit('.', 1)
module = __import__(module, globals(), locals(), ['*'], 0)
password = getattr(module, name)
context.load_cert_chain(section.certificate, section.key, password)
context.verify_mode = ssl.CERT_REQUIRED
if sys.version_info >= (3, 4):
context.verify_flags |= ssl.VERIFY_X509_STRICT | (
context.cert_store_stats()['crl'] and ssl.VERIFY_CRL_CHECK_LEAF)
if server:
context.check_hostname = False
return context
context.check_hostname = section.check_hostname
return context, section.server_hostname
def server_ssl(section):
return ssl_config(section, True)
def client_ssl(section):
return ssl_config(section, False)
class ClientStorageConfig:
def __init__(self, config):
self.config = config
self.name = config.getSectionName()
def open(self):
from ZEO.ClientStorage import ClientStorage
# config.server is a multikey of socket-connection-address values
# where the value is a socket family, address tuple.
config = self.config
addresses = [server.address for server in config.server]
options = {}
if config.blob_cache_size is not None:
options['blob_cache_size'] = config.blob_cache_size
if config.blob_cache_size_check is not None:
options['blob_cache_size_check'] = config.blob_cache_size_check
if config.client_label is not None:
options['client_label'] = config.client_label
ssl = config.ssl
if ssl:
options['ssl'] = ssl[0]
options['ssl_server_hostname'] = ssl[1]
return ClientStorage(
addresses,
blob_dir=config.blob_dir,
shared_blob_dir=config.shared_blob_dir,
storage=config.storage,
cache_size=config.cache_size,
cache=config.cache_path,
name=config.name,
read_only=config.read_only,
read_only_fallback=config.read_only_fallback,
wait_timeout=config.wait_timeout,
**options)
...@@ -20,6 +20,7 @@ deps = ...@@ -20,6 +20,7 @@ deps =
zope.interface zope.interface
zope.testing zope.testing
zope.testrunner zope.testrunner
mock
[testenv:simple] [testenv:simple]
# Test that 'setup.py test' works # Test that 'setup.py test' works
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment