Commit ee4d7bc7 authored by Kirill Smelkov's avatar Kirill Smelkov

client: Remove support for interoperability with ZEO4 server

As explained in https://github.com/zopefoundation/ZEO/issues/209 and
in recent patch titled "Add tests that demonstrates data corruption when
ZEO5 client is served by ZEO4 server" there is possibility of data
corruption when ZEO5 client loads data from ZEO4 server.

A fix for this is not trivial and would have to forward-port
load-tracking in client from ZEO4 to ZEO5. However, as discussed in
https://github.com/zopefoundation/ZEO/issues/209, we believe that noone
is actually using ZEO5.client-ZEO4.server configuration. Thus, given
that it was already planned to drop ZEO4 support soon, it was decided to
drop support for ZEO4 server instead of fixing it.

In this patch:

- we remove support for testing against ZEO4 server, including removing
  vendored ZEO4 copy.

- remove support for on-client methods that only ZEO4 server would call.
  This includes the sole method `serialnos`.

- remove support for connecting to any server besides one that
  interoperates with protocol '5'. ZEO4 used protocol '4'. It is
  explicitly tested by new test that updated ZEO5 client rejects
  connecting to a server that speaks protocol '4'.

- we do _not_ remove verify_invalidation_queue added by Jim in 2016 via
  5ba506e7 (Fixed a bug handling ZEO4 invalidations during cache
  verification) with the following message:

	ZEO4 servers can send invalidations out of order with
	``getInvalidations`` results, presumably because ``getInvalidations``
	didn't get the commit lock.  ZEO4 clients worked around this (maybe
	not directly) by queuing invalidations during cache verification.
	ZEO5 servers don't send invalidations out of order with
	``getInvalidations`` calls and the ZEO5 client didn't need an
	invalidation queue, except they do need one to work correctly with
	ZEO4 servers. :/

  This feature, even-though it is commented as being ZEO4-only, looks
  too risky to be removed in stable branch, especially taking into
  account that in https://github.com/zopefoundation/ZEO/pull/195
  @d-maurer instead of removing, preserved this queue in a similar form:

  https://github.com/zopefoundation/ZEO/blob/30e271bbe5380a1fe65f3ca776bc70b4b29b58d9/src/ZEO/asyncio/client.py#L90
  https://github.com/zopefoundation/ZEO/blob/30e271bbe5380a1fe65f3ca776bc70b4b29b58d9/src/ZEO/asyncio/client.py#L277-L280
  https://github.com/zopefoundation/ZEO/blob/30e271bbe5380a1fe65f3ca776bc70b4b29b58d9/src/ZEO/asyncio/client.py#L630-L641

  I believe it is better be safe than sorry.
parent aa08949d
...@@ -33,7 +33,6 @@ jobs: ...@@ -33,7 +33,6 @@ jobs:
- ["3.9", "docs"] - ["3.9", "docs"]
- ["3.9", "coverage"] - ["3.9", "coverage"]
- ["2.7", "py27-msgpack1"] - ["2.7", "py27-msgpack1"]
- ["2.7", "py27-zeo4"]
- ["2.7", "py27-zodbmaster"] - ["2.7", "py27-zodbmaster"]
- ["3.7", "py37-msgpack1"] - ["3.7", "py37-msgpack1"]
- ["3.7", "py37-uvloop"] - ["3.7", "py37-uvloop"]
......
...@@ -28,7 +28,6 @@ testenv-deps = [ ...@@ -28,7 +28,6 @@ testenv-deps = [
testenv-setenv = [ testenv-setenv = [
"!py27-!pypy: PYTHONWARNINGS=ignore::ResourceWarning", "!py27-!pypy: PYTHONWARNINGS=ignore::ResourceWarning",
"msgpack1: ZEO_MSGPACK=1", "msgpack1: ZEO_MSGPACK=1",
"zeo4: ZEO4_SERVER=1",
] ]
[coverage] [coverage]
...@@ -50,7 +49,6 @@ additional-rules = [ ...@@ -50,7 +49,6 @@ additional-rules = [
[github-actions] [github-actions]
additional-config = [ additional-config = [
"- [\"2.7\", \"py27-msgpack1\"]", "- [\"2.7\", \"py27-msgpack1\"]",
"- [\"2.7\", \"py27-zeo4\"]",
"- [\"2.7\", \"py27-zodbmaster\"]", "- [\"2.7\", \"py27-zodbmaster\"]",
"- [\"3.7\", \"py37-msgpack1\"]", "- [\"3.7\", \"py37-msgpack1\"]",
"- [\"3.7\", \"py37-uvloop\"]", "- [\"3.7\", \"py37-uvloop\"]",
......
...@@ -4,6 +4,15 @@ Changelog ...@@ -4,6 +4,15 @@ Changelog
5.4.0 (unreleased) 5.4.0 (unreleased)
------------------ ------------------
- Remove support for interoperability with ZEO4 server. It turned out that ZEO5
client, contrary to interoperability with ZEO5 server, implements support for
interoperability with ZEO4 server incorrectly with concurrency bugs that lead
to data corruption. The fix is not trivial and we believe that in 2022 noone
actually uses ZEO5.client-ZEO4.server configuration. That's why support for
ZEO4 server was dropped rather than fixed.
See `issue 209 <https://github.com/zopefoundation/ZEO/issues/209>` for details.
- Remove ``asyncio/mtacceptor`` module. It turned out that multi-threaded ZEO5 - Remove ``asyncio/mtacceptor`` module. It turned out that multi-threaded ZEO5
server has concurrency issues that lead to data corruption. Multi-threaded server has concurrency issues that lead to data corruption. Multi-threaded
server mode was already deprecated and scheduled for removal, so the fix is server mode was already deprecated and scheduled for removal, so the fix is
......
...@@ -219,15 +219,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -219,15 +219,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
Wait for server connection, defaulting to true. Wait for server connection, defaulting to true.
credentials credentials
[Experimental] Credentials object for additional authentication to [Experimental] Credentials object for authentication to server.
server.
username
password
realm
[ZEO4 only] Credentials for basic authentication to server.
In ZEO5 support for basic authentication has been dropped in favor
of SSL. Basic authentication support is scheduled to be removed in
`ZEO6`.
server_sync server_sync
Whether sync() should make a server round trip, thus causing client Whether sync() should make a server round trip, thus causing client
...@@ -255,6 +247,9 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -255,6 +247,9 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
Defaults to false. Defaults to false.
username
password
realm
disconnect_poll disconnect_poll
min_disconnect_poll min_disconnect_poll
max_disconnect_poll max_disconnect_poll
...@@ -266,8 +261,6 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -266,8 +261,6 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
testConnection() and doAuth() for details). testConnection() and doAuth() for details).
""" """
assert not username or password or realm
if isinstance(addr, int): if isinstance(addr, int):
addr = ('127.0.0.1', addr) addr = ('127.0.0.1', addr)
...@@ -1048,11 +1041,6 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -1048,11 +1041,6 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
# Below are methods invoked by the StorageServer # Below are methods invoked by the StorageServer
def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs.
"""
self._tbuf.serialnos(args)
def info(self, dict): def info(self, dict):
"""Server callback to update the info dictionary.""" """Server callback to update the info dictionary."""
self._info.update(dict) self._info.update(dict)
......
...@@ -91,15 +91,3 @@ class TransactionBuffer(object): ...@@ -91,15 +91,3 @@ class TransactionBuffer(object):
for oid in server_resolved: for oid in server_resolved:
if oid not in seen: if oid not in seen:
yield oid, None, True yield oid, None, True
# Support ZEO4:
def serialnos(self, args):
for oid in args:
if isinstance(oid, bytes):
self.server_resolved.add(oid)
else:
oid, serial = oid
if isinstance(serial, Exception):
self.exception = serial
elif serial == b'rs':
self.server_resolved.add(oid)
...@@ -27,7 +27,6 @@ from ZEO._compat import StringIO ...@@ -27,7 +27,6 @@ from ZEO._compat import StringIO
logger = logging.getLogger('ZEO.tests.forker') logger = logging.getLogger('ZEO.tests.forker')
DEBUG = os.environ.get('ZEO_TEST_SERVER_DEBUG') DEBUG = os.environ.get('ZEO_TEST_SERVER_DEBUG')
ZEO4_SERVER = os.environ.get('ZEO4_SERVER')
class ZEOConfig(object): class ZEOConfig(object):
...@@ -102,13 +101,7 @@ def runner(config, qin, qout, timeout=None, ...@@ -102,13 +101,7 @@ def runner(config, qin, qout, timeout=None,
try: try:
import threading import threading
from . import runzeo
if ZEO4_SERVER:
# XXX: test dependency. In practice this is
# probably ok
from ZEO.tests.ZEO4 import runzeo
else:
from . import runzeo
options = runzeo.ZEOOptions() options = runzeo.ZEOOptions()
options.realize(['-C', config]) options.realize(['-C', config])
...@@ -118,10 +111,7 @@ def runner(config, qin, qout, timeout=None, ...@@ -118,10 +111,7 @@ def runner(config, qin, qout, timeout=None,
server.clear_socket() server.clear_socket()
server.create_server() server.create_server()
logger.debug('SERVER CREATED') logger.debug('SERVER CREATED')
if ZEO4_SERVER: qout.put(server.server.acceptor.addr)
qout.put(server.server.addr)
else:
qout.put(server.server.acceptor.addr)
logger.debug('ADDRESS SENT') logger.debug('ADDRESS SENT')
thread = threading.Thread( thread = threading.Thread(
target=server.server.loop, kwargs=dict(timeout=.2), target=server.server.loop, kwargs=dict(timeout=.2),
......
...@@ -86,7 +86,7 @@ class Protocol(base.Protocol): ...@@ -86,7 +86,7 @@ class Protocol(base.Protocol):
# One place where special care was required was in cache setup on # One place where special care was required was in cache setup on
# connect. See finish connect below. # connect. See finish connect below.
protocols = b'309', b'310', b'3101', b'4', b'5' protocols = b'5',
def __init__(self, loop, def __init__(self, loop,
addr, client, storage_key, read_only, connect_poll=1, addr, client, storage_key, read_only, connect_poll=1,
...@@ -313,11 +313,11 @@ class Protocol(base.Protocol): ...@@ -313,11 +313,11 @@ class Protocol(base.Protocol):
# syncronously, as that would lead to DEADLOCK! # syncronously, as that would lead to DEADLOCK!
client_methods = ( client_methods = (
'invalidateTransaction', 'serialnos', 'info', 'invalidateTransaction', 'info',
'receiveBlobStart', 'receiveBlobChunk', 'receiveBlobStop', 'receiveBlobStart', 'receiveBlobChunk', 'receiveBlobStop',
# plus: notify_connected, notify_disconnected # plus: notify_connected, notify_disconnected
) )
client_delegated = client_methods[2:] client_delegated = client_methods[1:]
def heartbeat(self, write=True): def heartbeat(self, write=True):
if write: if write:
...@@ -434,10 +434,10 @@ class Client(object): ...@@ -434,10 +434,10 @@ class Client(object):
self.protocols = () self.protocols = ()
self.disconnected(None) self.disconnected(None)
# Work around odd behavior of ZEO4 server. It may send # Protection against potentially odd behavior of a ZEO server: if it
# invalidations for transactions later than the result of # may send invalidations for transactions later than the result of
# getInvalidations. While we support ZEO 4 servers, we'll # getInvalidations, without queueing, client's cache might get out of
# need to keep an invalidation queue. :( # sync wrt data on the server.
self.verify_invalidation_queue = [] self.verify_invalidation_queue = []
def new_addrs(self, addrs): def new_addrs(self, addrs):
...@@ -598,7 +598,7 @@ class Client(object): ...@@ -598,7 +598,7 @@ class Client(object):
self.cache.setLastTid(server_tid) self.cache.setLastTid(server_tid)
self.ready = True self.ready = True
# Gaaaa, ZEO 4 work around. See comment in __init__. :( # See comment in __init__. :(
for tid, oids in self.verify_invalidation_queue: for tid, oids in self.verify_invalidation_queue:
if tid > server_tid: if tid > server_tid:
self.invalidateTransaction(tid, oids) self.invalidateTransaction(tid, oids)
...@@ -769,24 +769,6 @@ class Client(object): ...@@ -769,24 +769,6 @@ class Client(object):
else: else:
self.verify_invalidation_queue.append((tid, oids)) self.verify_invalidation_queue.append((tid, oids))
def serialnos(self, serials):
# Method called by ZEO4 storage servers.
# Before delegating, check for errors (likely ConflictErrors)
# and invalidate the oids they're associated with. In the
# past, this was done by the client, but now we control the
# cache and this is our last chance, as the client won't call
# back into us when there's an error.
for oid in serials:
if isinstance(oid, bytes):
self.cache.invalidate(oid, None)
else:
oid, serial = oid
if isinstance(serial, Exception) or serial == b'rs':
self.cache.invalidate(oid, None)
self.client.serialnos(serials)
@property @property
def protocol_version(self): def protocol_version(self):
return self.protocol.protocol_version return self.protocol.protocol_version
......
...@@ -112,8 +112,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -112,8 +112,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
transport = loop.transport transport = loop.transport
if finish_start: if finish_start:
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.pop(2, False), self.enc + b'3101') self.assertEqual(self.pop(2, False), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.respond(2, 'a'*8) self.respond(2, 'a'*8)
self.pop(4) self.pop(4)
...@@ -323,8 +323,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -323,8 +323,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# This time we'll send a lower protocol version. The client # This time we'll send a lower protocol version. The client
# will send it back, because it's lower than the client's # will send it back, because it's lower than the client's
# protocol: # protocol:
protocol.data_received(sized(self.enc + b'310')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'310') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.assertEqual(self.pop(), (1, False, 'register', ('TEST', False))) self.assertEqual(self.pop(), (1, False, 'register', ('TEST', False)))
self.assertFalse(wrapper.notify_connected.called) self.assertFalse(wrapper.notify_connected.called)
...@@ -359,8 +359,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -359,8 +359,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
cache.store(b'2'*8, b'a'*8, None, '2 data') cache.store(b'2'*8, b'a'*8, None, '2 data')
self.assertFalse(client.connected.done() or transport.data) self.assertFalse(client.connected.done() or transport.data)
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.respond(2, b'e'*8) self.respond(2, b'e'*8)
self.pop(4) self.pop(4)
...@@ -395,8 +395,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -395,8 +395,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertTrue(cache) self.assertTrue(cache)
self.assertFalse(client.connected.done() or transport.data) self.assertFalse(client.connected.done() or transport.data)
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.respond(2, b'e'*8) self.respond(2, b'e'*8)
self.pop(4) self.pop(4)
...@@ -447,8 +447,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -447,8 +447,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertEqual(sorted(loop.connecting), addrs[:1]) self.assertEqual(sorted(loop.connecting), addrs[:1])
protocol = loop.protocol protocol = loop.protocol
transport = loop.transport transport = loop.transport
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
# Now, when the first connection fails, it won't be retried, # Now, when the first connection fails, it won't be retried,
...@@ -465,8 +465,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -465,8 +465,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
wrapper, cache, loop, client, protocol, transport = self.start() wrapper, cache, loop, client, protocol, transport = self.start()
cache.store(b'4'*8, b'a'*8, None, '4 data') cache.store(b'4'*8, b'a'*8, None, '4 data')
cache.setLastTid('b'*8) cache.setLastTid('b'*8)
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.respond(2, 'a'*8) self.respond(2, 'a'*8)
self.pop() self.pop()
...@@ -479,8 +479,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -479,8 +479,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertFalse(transport is loop.transport) self.assertFalse(transport is loop.transport)
protocol = loop.protocol protocol = loop.protocol
transport = loop.transport transport = loop.transport
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.respond(2, 'b'*8) self.respond(2, 'b'*8)
self.pop(4) self.pop(4)
...@@ -499,8 +499,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -499,8 +499,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# We'll treat the first address as read-only and we'll let it connect: # We'll treat the first address as read-only and we'll let it connect:
loop.connect_connecting(addrs[0]) loop.connect_connecting(addrs[0])
protocol, transport = loop.protocol, loop.transport protocol, transport = loop.protocol, loop.transport
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
# We see that the client tried a writable connection: # We see that the client tried a writable connection:
self.assertEqual(self.pop(), self.assertEqual(self.pop(),
(1, False, 'register', ('TEST', False))) (1, False, 'register', ('TEST', False)))
...@@ -531,9 +531,9 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -531,9 +531,9 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# We connect the second address: # We connect the second address:
loop.connect_connecting(addrs[1]) loop.connect_connecting(addrs[1])
loop.protocol.data_received(sized(self.enc + b'3101')) loop.protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(loop.transport.pop(2)), self.assertEqual(self.unsized(loop.transport.pop(2)),
self.enc + b'3101') self.enc + b'5')
self.assertEqual(self.parse(loop.transport.pop()), self.assertEqual(self.parse(loop.transport.pop()),
(1, False, 'register', ('TEST', False))) (1, False, 'register', ('TEST', False)))
self.assertTrue(self.is_read_only()) self.assertTrue(self.is_read_only())
...@@ -567,8 +567,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -567,8 +567,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
def test_invalidations_while_verifying(self): def test_invalidations_while_verifying(self):
# While we're verifying, invalidations are ignored # While we're verifying, invalidations are ignored
wrapper, cache, loop, client, protocol, transport = self.start() wrapper, cache, loop, client, protocol, transport = self.start()
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.pop(4) self.pop(4)
self.send('invalidateTransaction', b'b'*8, [b'1'*8], called=False) self.send('invalidateTransaction', b'b'*8, [b'1'*8], called=False)
...@@ -586,8 +586,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -586,8 +586,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# Similarly, invalidations aren't processed while reconnecting: # Similarly, invalidations aren't processed while reconnecting:
protocol.data_received(sized(self.enc + b'3101')) protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101') self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None) self.respond(1, None)
self.pop(4) self.pop(4)
self.send('invalidateTransaction', b'd'*8, [b'1'*8], called=False) self.send('invalidateTransaction', b'd'*8, [b'1'*8], called=False)
......
======================
Copy of ZEO 4 server
======================
This copy was made by first converting the ZEO 4 server code to use
relative imports. The code was tested with ZEO 4 before copying. It
was unchanged aside from the relative imports.
The ZEO 4 server is used for tests if the ZEO4_SERVER environment
variable is set to a non-empty value.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
_auth_modules = {}
def get_module(name):
if name == 'sha':
from auth_sha import StorageClass, SHAClient, Database
return StorageClass, SHAClient, Database
elif name == 'digest':
from .auth_digest import StorageClass, DigestClient, DigestDatabase
return StorageClass, DigestClient, DigestDatabase
else:
return _auth_modules.get(name)
def register_module(name, storage_class, client, db):
if name in _auth_modules:
raise TypeError("%s is already registred" % name)
_auth_modules[name] = storage_class, client, db
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Digest authentication for ZEO
This authentication mechanism follows the design of HTTP digest
authentication (RFC 2069). It is a simple challenge-response protocol
that does not send passwords in the clear, but does not offer strong
security. The RFC discusses many of the limitations of this kind of
protocol.
Guard the password database as if it contained plaintext passwords.
It stores the hash of a username and password. This does not expose
the plaintext password, but it is sensitive nonetheless. An attacker
with the hash can impersonate the real user. This is a limitation of
the simple digest scheme.
HTTP is a stateless protocol, and ZEO is a stateful protocol. The
security requirements are quite different as a result. The HTTP
protocol uses a nonce as a challenge. The ZEO protocol requires a
separate session key that is used for message authentication. We
generate a second nonce for this purpose; the hash of nonce and
user/realm/password is used as the session key.
TODO: I'm not sure if this is a sound approach; SRP would be preferred.
"""
import os
import random
import struct
import time
from .base import Database, Client
from ..StorageServer import ZEOStorage
from ZEO.Exceptions import AuthError
from ..hash import sha1
def get_random_bytes(n=8):
try:
b = os.urandom(n)
except NotImplementedError:
L = [chr(random.randint(0, 255)) for i in range(n)]
b = b"".join(L)
return b
def hexdigest(s):
return sha1(s.encode()).hexdigest()
class DigestDatabase(Database):
def __init__(self, filename, realm=None):
Database.__init__(self, filename, realm)
# Initialize a key used to build the nonce for a challenge.
# We need one key for the lifetime of the server, so it
# is convenient to store in on the database.
self.noncekey = get_random_bytes(8)
def _store_password(self, username, password):
dig = hexdigest("%s:%s:%s" % (username, self.realm, password))
self._users[username] = dig
def session_key(h_up, nonce):
# The hash itself is a bit too short to be a session key.
# HMAC wants a 64-byte key. We don't want to use h_up
# directly because it would never change over time. Instead
# use the hash plus part of h_up.
return (sha1(("%s:%s" % (h_up, nonce)).encode('latin-1')).digest() +
h_up.encode('utf-8')[:44])
class StorageClass(ZEOStorage):
def set_database(self, database):
assert isinstance(database, DigestDatabase)
self.database = database
self.noncekey = database.noncekey
def _get_time(self):
# Return a string representing the current time.
t = int(time.time())
return struct.pack("i", t)
def _get_nonce(self):
# RFC 2069 recommends a nonce of the form
# H(client-IP ":" time-stamp ":" private-key)
dig = sha1()
dig.update(str(self.connection.addr).encode('latin-1'))
dig.update(self._get_time())
dig.update(self.noncekey)
return dig.hexdigest()
def auth_get_challenge(self):
"""Return realm, challenge, and nonce."""
self._challenge = self._get_nonce()
self._key_nonce = self._get_nonce()
return self.auth_realm, self._challenge, self._key_nonce
def auth_response(self, resp):
# verify client response
user, challenge, response = resp
# Since zrpc is a stateful protocol, we just store the nonce
# we sent to the client. It will need to generate a new
# nonce for a new connection anyway.
if self._challenge != challenge:
raise ValueError("invalid challenge")
# lookup user in database
h_up = self.database.get_password(user)
# regeneration resp from user, password, and nonce
check = hexdigest("%s:%s" % (h_up, challenge))
if check == response:
self.connection.setSessionKey(session_key(h_up, self._key_nonce))
return self._finish_auth(check == response)
extensions = [auth_get_challenge, auth_response]
class DigestClient(Client):
extensions = ["auth_get_challenge", "auth_response"]
def start(self, username, realm, password):
_realm, challenge, nonce = self.stub.auth_get_challenge()
if _realm != realm:
raise AuthError("expected realm %r, got realm %r"
% (_realm, realm))
h_up = hexdigest("%s:%s:%s" % (username, realm, password))
resp_dig = hexdigest("%s:%s" % (h_up, challenge))
result = self.stub.auth_response((username, challenge, resp_dig))
if result:
return session_key(h_up, nonce)
else:
return None
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Base classes for defining an authentication protocol.
Database -- abstract base class for password database
Client -- abstract base class for authentication client
"""
from __future__ import print_function
from __future__ import print_function
import os
from ..hash import sha1
class Client(object):
# Subclass should override to list the names of methods that
# will be called on the server.
extensions = []
def __init__(self, stub):
self.stub = stub
for m in self.extensions:
setattr(self.stub, m, self.stub.extensionMethod(m))
def sort(L):
"""Sort a list in-place and return it."""
L.sort()
return L
class Database(object):
"""Abstracts a password database.
This class is used both in the authentication process (via
get_password()) and by client scripts that manage the password
database file.
The password file is a simple, colon-separated text file mapping
usernames to password hashes. The hashes are SHA hex digests
produced from the password string.
"""
realm = None
def __init__(self, filename, realm=None):
"""Creates a new Database
filename: a string containing the full pathname of
the password database file. Must be readable by the user
running ZEO. Must be writeable by any client script that
accesses the database.
realm: the realm name (a string)
"""
self._users = {}
self.filename = filename
self.load()
if realm:
if self.realm and self.realm != realm:
raise ValueError("Specified realm %r differs from database "
"realm %r" % (realm or '', self.realm))
else:
self.realm = realm
def save(self, fd=None):
filename = self.filename
needs_closed = False
if not fd:
fd = open(filename, 'w')
needs_closed = True
try:
if self.realm:
print("realm", self.realm, file=fd)
for username in sorted(self._users.keys()):
print("%s: %s" % (username, self._users[username]), file=fd)
finally:
if needs_closed:
fd.close()
def load(self):
filename = self.filename
if not filename:
return
if not os.path.exists(filename):
return
with open(filename) as fd:
L = fd.readlines()
if not L:
return
if L[0].startswith("realm "):
line = L.pop(0).strip()
self.realm = line[len("realm "):]
for line in L:
username, hash = line.strip().split(":", 1)
self._users[username] = hash.strip()
def _store_password(self, username, password):
self._users[username] = self.hash(password)
def get_password(self, username):
"""Returns password hash for specified username.
Callers must check for LookupError, which is raised in
the case of a non-existent user specified."""
if username not in self._users:
raise LookupError("No such user: %s" % username)
return self._users[username]
def hash(self, s):
return sha1(s.encode()).hexdigest()
def add_user(self, username, password):
if username in self._users:
raise LookupError("User %s already exists" % username)
self._store_password(username, password)
def del_user(self, username):
if username not in self._users:
raise LookupError("No such user: %s" % username)
del self._users[username]
def change_password(self, username, password):
if username not in self._users:
raise LookupError("No such user: %s" % username)
self._store_password(username, password)
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
"""
from six.moves import map
def _strxor(s1, s2):
"""Utility method. XOR the two strings s1 and s2 (must have same length).
"""
return "".join(map(lambda x, y: chr(ord(x) ^ ord(y)), s1, s2))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used.
digest_size = None
class HMAC(object):
"""RFC2104 HMAC class.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
def __init__(self, key, msg=None, digestmod=None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. Defaults to the md5 module.
"""
if digestmod is None:
import md5
digestmod = md5
self.digestmod = digestmod
self.outer = digestmod.new()
self.inner = digestmod.new()
self.digest_size = digestmod.digest_size
blocksize = 64
ipad = "\x36" * blocksize
opad = "\x5C" * blocksize
if len(key) > blocksize:
key = digestmod.new(key).digest()
key = key + chr(0) * (blocksize - len(key))
self.outer.update(_strxor(key, opad))
self.inner.update(_strxor(key, ipad))
if msg is not None:
self.update(msg)
# def clear(self):
# raise NotImplementedError("clear() method not available in HMAC.")
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
other = HMAC("")
other.digestmod = self.digestmod
other.inner = self.inner.copy()
other.outer = self.outer.copy()
return other
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self.outer.copy()
h.update(self.inner.digest())
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
return "".join([hex(ord(x))[2:].zfill(2)
for x in tuple(self.digest())])
def new(key, msg=None, digestmod=None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
<component>
<sectiontype name="zeo">
<description>
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address"
required="yes">
<description>
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="read-only" datatype="boolean"
required="no"
default="false">
<description>
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description>
</key>
<key name="invalidation-queue-size" datatype="integer"
required="no"
default="100">
<description>
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description>
</key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="monitor-address" datatype="socket-binding-address"
required="no">
<description>
The address at which the monitor server should listen. If
specified, a monitor server is started. The monitor server
provides server statistics in a simple text format. This can
be in the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="transaction-timeout" datatype="integer"
required="no">
<description>
The maximum amount of time to wait for a transaction to commit
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description>
</key>
<key name="authentication-protocol" required="no">
<description>
The name of the protocol used for authentication. The
only protocol provided with ZEO is "digest," but extensions
may provide other protocols.
</description>
</key>
<key name="authentication-database" required="no">
<description>
The path of the database containing authentication credentials.
</description>
</key>
<key name="authentication-realm" required="no">
<description>
The authentication realm of the server. Some authentication
schemes use a realm to identify the logical set of usernames
that are accepted by this server.
</description>
</key>
<key name="pid-filename" datatype="existing-dirpath"
required="no">
<description>
The full path to the file in which to write the ZEO server's Process ID
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
</description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>
<!-- DM 2006-06-12: added option -->
<key name="drop-cache-rather-verify" datatype="boolean"
required="no" default="false">
<description>
indicates that the cache should be dropped rather than
verified when the verification optimization is not
available (e.g. when the ZEO server restarted).
</description>
</key>
</sectiontype>
</component>
##############################################################################
#
# Copyright (c) 2008 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""In Python 2.6, the "sha" and "md5" modules have been deprecated
in favor of using hashlib for both. This class allows for compatibility
between versions."""
try:
import hashlib
sha1 = hashlib.sha1
new = sha1
except ImportError:
import sha
sha1 = sha.new
new = sha1
digest_size = sha.digest_size
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.
"""
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
import asyncore
import socket
import time
import logging
zeo_version = 'unknown'
try:
import pkg_resources
except ImportError:
pass
else:
zeo_dist = pkg_resources.working_set.find(
pkg_resources.Requirement.parse('ZODB3')
)
if zeo_dist is not None:
zeo_version = zeo_dist.version
class StorageStats(object):
"""Per-storage usage statistics."""
def __init__(self, connections=None):
self.connections = connections
self.loads = 0
self.stores = 0
self.commits = 0
self.aborts = 0
self.active_txns = 0
self.verifying_clients = 0
self.lock_time = None
self.conflicts = 0
self.conflicts_resolved = 0
self.start = time.ctime()
@property
def clients(self):
return len(self.connections)
def parse(self, s):
# parse the dump format
lines = s.split("\n")
for line in lines:
field, value = line.split(":", 1)
if field == "Server started":
self.start = value
elif field == "Clients":
# Hack because we use this both on the server and on
# the client where there are no connections.
self.connections = [0] * int(value)
elif field == "Clients verifying":
self.verifying_clients = int(value)
elif field == "Active transactions":
self.active_txns = int(value)
elif field == "Commit lock held for":
# This assumes
self.lock_time = time.time() - int(value)
elif field == "Commits":
self.commits = int(value)
elif field == "Aborts":
self.aborts = int(value)
elif field == "Loads":
self.loads = int(value)
elif field == "Stores":
self.stores = int(value)
elif field == "Conflicts":
self.conflicts = int(value)
elif field == "Conflicts resolved":
self.conflicts_resolved = int(value)
def dump(self, f):
print("Server started:", self.start, file=f)
print("Clients:", self.clients, file=f)
print("Clients verifying:", self.verifying_clients, file=f)
print("Active transactions:", self.active_txns, file=f)
if self.lock_time:
howlong = time.time() - self.lock_time
print("Commit lock held for:", int(howlong), file=f)
print("Commits:", self.commits, file=f)
print("Aborts:", self.aborts, file=f)
print("Loads:", self.loads, file=f)
print("Stores:", self.stores, file=f)
print("Conflicts:", self.conflicts, file=f)
print("Conflicts resolved:", self.conflicts_resolved, file=f)
class StatsClient(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.buf = []
self.closed = 0
def close(self):
self.closed = 1
# The socket is closed after all the data is written.
# See handle_write().
def write(self, s):
self.buf.append(s)
def writable(self):
return len(self.buf)
def readable(self):
return 0
def handle_write(self):
s = "".join(self.buf)
self.buf = []
n = self.socket.send(s.encode('ascii'))
if n < len(s):
self.buf.append(s[:n])
if self.closed and not self.buf:
asyncore.dispatcher.close(self)
class StatsServer(asyncore.dispatcher):
StatsConnectionClass = StatsClient
def __init__(self, addr, stats):
asyncore.dispatcher.__init__(self)
self.addr = addr
self.stats = stats
if type(self.addr) == tuple:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
logger = logging.getLogger('ZEO.monitor')
logger.info("listening on %s", repr(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
return
f = self.StatsConnectionClass(sock, addr)
self.dump(f)
f.close()
def dump(self, f):
print("ZEO monitor server version %s" % zeo_version, file=f)
print(time.ctime(), file=f)
print(file=f)
L = sorted(self.stats.keys())
for k in L:
stats = self.stats[k]
print("Storage:", k, file=f)
stats.dump(f)
print(file=f)
This diff is collapsed.
<schema>
<!-- note that zeoctl.xml is a closely related schema which should
match this schema, but should require the "runner" section -->
<description>
This schema describes the configuration of the ZEO storage server
process.
</description>
<!-- Use the storage types defined by ZODB. -->
<import package="ZODB"/>
<!-- Use the ZEO server information structure. -->
<import package="ZEO.tests.ZEO4"/>
<import package="ZConfig.components.logger"/>
<!-- runner control -->
<import package="zdaemon"/>
<section type="zeo" name="*" required="yes" attribute="zeo" />
<section type="runner" name="*" required="no" attribute="runner" />
<multisection name="*" type="ZODB.storage"
attribute="storages"
required="yes">
<description>
One or more storages that are provided by the ZEO server. The
section names are used as the storage names, and must be unique
within each ZEO storage server. Traditionally, these names
represent small integers starting at '1'.
</description>
</multisection>
<section name="*" type="eventlog" attribute="eventlog" required="no" />
</schema>
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger
# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
# This file is a slightly modified copy of Python 2.3's Lib/hmac.py.
# This file is under the Python Software Foundation (PSF) license.
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
"""
from six.moves import map
def _strxor(s1, s2):
"""Utility method. XOR the two strings s1 and s2 (must have same length).
"""
return "".join(map(lambda x, y: chr(ord(x) ^ ord(y)), s1, s2))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used.
digest_size = None
class HMAC(object):
"""RFC2104 HMAC class.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
def __init__(self, key, msg=None, digestmod=None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. Defaults to the md5 module.
"""
if digestmod is None:
import md5
digestmod = md5
self.digestmod = digestmod
self.outer = digestmod.new()
self.inner = digestmod.new()
# Python 2.1 and 2.2 differ about the correct spelling
try:
self.digest_size = digestmod.digestsize
except AttributeError:
self.digest_size = digestmod.digest_size
blocksize = 64
ipad = "\x36" * blocksize
opad = "\x5C" * blocksize
if len(key) > blocksize:
key = digestmod.new(key).digest()
key = key + chr(0) * (blocksize - len(key))
self.outer.update(_strxor(key, opad))
self.inner.update(_strxor(key, ipad))
if msg is not None:
self.update(msg)
# def clear(self):
# raise NotImplementedError("clear() method not available in HMAC.")
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
other = HMAC("")
other.digestmod = self.digestmod
other.inner = self.inner.copy()
other.outer = self.outer.copy()
return other
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self.outer.copy()
h.update(self.inner.digest())
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
return "".join([hex(ord(x))[2:].zfill(2)
for x in tuple(self.digest())])
def new(key, msg=None, digestmod=None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
This diff is collapsed.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import ClientDisconnected
class ZRPCError(POSException.StorageError):
pass
class DisconnectedError(ZRPCError, ClientDisconnected):
"""The database storage is disconnected from the storage server.
The error occurred because a problem in the low-level RPC connection,
or because the connection was closed.
"""
# This subclass is raised when zrpc catches the error.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import threading
import logging
from ZODB.loglevels import BLATHER
LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
logger = logging.getLogger('ZEO.zrpc')
_label = "%s" % os.getpid()
def new_label():
global _label
_label = str(os.getpid())
def log(message, level=BLATHER, label=None, exc_info=False):
label = label or _label
if LOG_THREAD_ID:
label = label + ':' + threading.currentThread().getName()
logger.log(level, '(%s) %s' % (label, message), exc_info=exc_info)
REPR_LIMIT = 60
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. A lot of memory
# would be wasted to repr them and then truncate, so they are treated
# specially in this function.
# Also handle short repr of a tuple containing a long string.
# This strategy works well for arguments to StorageServer methods.
# The oid is usually first and will get included in its entirety.
# The pickle is near the beginning, too, and you can often fit the
# module name in the pickle.
if isinstance(obj, str):
if len(obj) > REPR_LIMIT:
r = repr(obj[:REPR_LIMIT])
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
r = r[:REPR_LIMIT-4] + '...' + r[-1]
return r
elif isinstance(obj, (list, tuple)):
elts = []
size = 0
for elt in obj:
r = short_repr(elt)
elts.append(r)
size += len(r)
if size > REPR_LIMIT:
break
if isinstance(obj, tuple):
r = "(%s)" % (", ".join(elts))
else:
r = "[%s]" % (", ".join(elts))
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
return r[:REPR_LIMIT] + '...'
else:
return r
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -35,15 +35,6 @@ logger = logging.getLogger('ZEO.tests.forker') ...@@ -35,15 +35,6 @@ logger = logging.getLogger('ZEO.tests.forker')
DEBUG = _forker.DEBUG DEBUG = _forker.DEBUG
ZEO4_SERVER = _forker.ZEO4_SERVER
skip_if_testing_client_against_zeo4 = (
(lambda func: None)
if ZEO4_SERVER else
(lambda func: func)
)
ZEOConfig = _forker.ZEOConfig ZEOConfig = _forker.ZEOConfig
......
This diff is collapsed.
...@@ -102,9 +102,8 @@ test_classes = [FileStorageConnectionTests, ...@@ -102,9 +102,8 @@ test_classes = [FileStorageConnectionTests,
FileStorageTimeoutTests, FileStorageTimeoutTests,
MappingStorageConnectionTests, MappingStorageConnectionTests,
MappingStorageTimeoutTests, MappingStorageTimeoutTests,
SSLConnectionTests,
] ]
if not forker.ZEO4_SERVER:
test_classes.append(SSLConnectionTests)
def invalidations_while_connecting(): def invalidations_while_connecting():
......
...@@ -58,7 +58,7 @@ class FakeServer(object): ...@@ -58,7 +58,7 @@ class FakeServer(object):
class FakeConnection(object): class FakeConnection(object):
protocol_version = b'Z4' protocol_version = b'Z5'
addr = 'test' addr = 'test'
def call_soon_threadsafe(f, *a): def call_soon_threadsafe(f, *a):
......
This diff is collapsed.
...@@ -8,11 +8,9 @@ import unittest ...@@ -8,11 +8,9 @@ import unittest
import ZEO.StorageServer import ZEO.StorageServer
from . import forker
from .threaded import threaded_server_tests from .threaded import threaded_server_tests
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientAuthTests(setupstack.TestCase): class ClientAuthTests(setupstack.TestCase):
def setUp(self): def setUp(self):
......
...@@ -12,7 +12,6 @@ from ZODB.broken import find_global ...@@ -12,7 +12,6 @@ from ZODB.broken import find_global
import ZEO import ZEO
from . import forker
from .utils import StorageServer from .utils import StorageServer
...@@ -22,7 +21,6 @@ class Var(object): ...@@ -22,7 +21,6 @@ class Var(object):
return True return True
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase): class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
def test_server_side(self): def test_server_side(self):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -25,7 +25,6 @@ deps = ...@@ -25,7 +25,6 @@ deps =
setenv = setenv =
!py27-!pypy: PYTHONWARNINGS=ignore::ResourceWarning !py27-!pypy: PYTHONWARNINGS=ignore::ResourceWarning
msgpack1: ZEO_MSGPACK=1 msgpack1: ZEO_MSGPACK=1
zeo4: ZEO4_SERVER=1
commands = commands =
# Run unit tests first. # Run unit tests first.
zope-testrunner -u --test-path=src -a 1000 {posargs:-vc} zope-testrunner -u --test-path=src -a 1000 {posargs:-vc}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment