Commit 01d74173 authored by Jim Fulton's avatar Jim Fulton

Fixed more tests

parent 1cc3ceb5
......@@ -98,6 +98,8 @@ class ClientStorage(object):
wait=True,
drop_cache_rather_verify=True,
username=None, password=None, realm=None,
# For tests:
_client_factory=ZEO.asyncio.client.ClientThread,
):
"""ClientStorage constructor.
......@@ -256,18 +258,24 @@ class ClientStorage(object):
blob_cache_size * blob_cache_size_check // 100)
self._check_blob_size()
self._server = ZEO.asyncio.client.ClientThread(
self._server = _client_factory(
addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30,
)
self._server.start()
self._call = self._server.call
self._async = self._server.async
self._async_iter = self._server.async_iter
self._commit_lock = threading.Lock()
try:
self._server.start(wait=wait)
except Exception:
# No point in keeping the server going of the storage creation fails
self._server.close()
raise
def new_addr(self, addr):
self._addr = addr
self._server.new_addrs(addr)
......@@ -763,6 +771,12 @@ class ClientStorage(object):
# all, yet you want to be sure that other abort logic is
# executed regardless.
try:
# It's tempting to make an asynchronous call here, but
# it's useful for it to be synchronous because, if we
# failed due to a disconnect, synchronous calls will
# wait a little while in hopes of reconnecting. If
# we're able to reconnect and retry the transaction,
# ten it might succeed!
self._call('tpc_abort', id(txn))
except ClientDisconnected:
logger.debug("%s ClientDisconnected in tpc_abort() ignored",
......
......@@ -11,7 +11,7 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import concurrent.futures
import os
import time
import socket
......@@ -174,6 +174,7 @@ class CommonSetupTearDown(StorageTestBase):
var='.',
cache_size=cache_size,
wait=wait,
wait_timeout=1,
min_disconnect_poll=0.1,
read_only=read_only,
read_only_fallback=read_only_fallback,
......@@ -454,7 +455,7 @@ class ConnectionTests(CommonSetupTearDown):
def checkBadMessage1(self):
# not even close to a real message
self._bad_message("salty")
self._bad_message(b"salty")
def checkBadMessage2(self):
# just like a real message, but with an unpicklable argument
......@@ -474,22 +475,36 @@ class ConnectionTests(CommonSetupTearDown):
self._storage = self.openClientStorage()
self._dostore()
# break into the internals to send a bogus message
zrpc_conn = self._storage._server.rpc
zrpc_conn.message_output(msg)
generation = self._storage._connection_generation
future = concurrent.futures.Future()
def write():
try:
self._storage._server.client.protocol._write(msg)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(None)
# break into the internals to send a bogus message
self._storage._server.loop.call_soon_threadsafe(write)
future.result()
# If we manage to call _dostore before the server disconnects
# us, we'll get a ClientDisconnected error. When we retry, it
# will succeed. It will succeed because:
# - _dostore calls tpc_abort
# - tpc_abort makes a synchronous call to the server to abort
# the transaction
# - when disconnected, synchronous calls are blocked for a little
# while while reconnecting (or they timeout of it takes too long).
try:
self._dostore()
except ClientDisconnected:
pass
else:
self._storage.close()
self.fail("Server did not disconnect after bogus message")
self._storage.close()
self._dostore()
self._storage = self.openClientStorage()
self._dostore()
self._storage.close()
self.assertTrue(self._storage._connection_generation > generation)
# Test case for multiple storages participating in a single
# transaction. This is not really a connection test, but it needs
......
......@@ -14,6 +14,8 @@
import doctest
import unittest
import ZEO.asyncio.testing
class FakeStorageBase:
def __getattr__(self, name):
......@@ -52,7 +54,7 @@ class FakeServer:
def test_server_record_iternext():
"""
On the server, record_iternext calls are simply delegated to the
underlying storage.
......@@ -71,7 +73,7 @@ underlying storage.
2
3
4
The storage info also reflects the fact that record_iternext is supported.
>>> zeo.get_info()['supports_record_iternext']
......@@ -85,9 +87,8 @@ The storage info also reflects the fact that record_iternext is supported.
"""
def test_client_record_iternext():
"""\
"""Test client storage delegation to the network client
The client simply delegates record_iternext calls to it's server stub.
......@@ -96,21 +97,25 @@ stuff. I'd rather do a lame test than a really lame test, so here goes.
First, fake out the connection manager so we can make a connection:
>>> import ZEO.ClientStorage
>>> from ZEO.ClientStorage import ClientStorage
>>> oldConnectionManagerClass = ClientStorage.ConnectionManagerClass
>>> class FauxConnectionManagerClass:
... def __init__(*a, **k):
... pass
... def attempt_connect(self):
... return True
>>> ClientStorage.ConnectionManagerClass = FauxConnectionManagerClass
>>> client = ClientStorage('', wait=False)
>>> ClientStorage.ConnectionManagerClass = oldConnectionManagerClass
>>> import ZEO
>>> class Client(ZEO.asyncio.testing.ClientRunner):
...
... def record_iternext(self, next=None):
... if next == None:
... next = '0'
... next = str(int(next) + 1)
... oid = next
... if next == '4':
... next = None
...
... return oid, oid*8, 'data ' + oid, next
>>> client = ZEO.client(
... '', wait=False, _client_factory=Client)
Now we'll have our way with it's private _server attr:
>>> client._server = FakeStorage()
>>> next = None
>>> while 1:
... oid, serial, data, next = client.record_iternext(next)
......@@ -124,35 +129,6 @@ Now we'll have our way with it's private _server attr:
"""
def test_server_stub_record_iternext():
"""\
The server stub simply delegates record_iternext calls to it's rpc.
There's really no decent way to test ZEO without running to much crazy
stuff. I'd rather do a lame test than a really lame test, so here goes.
>>> class FauxRPC:
... storage = FakeStorage()
... def call(self, meth, *args):
... return getattr(self.storage, meth)(*args)
... peer_protocol_version = 1
>>> import ZEO.ServerStub
>>> stub = ZEO.ServerStub.StorageServer(FauxRPC())
>>> next = None
>>> while 1:
... oid, serial, data, next = stub.record_iternext(next)
... print(oid)
... if next is None:
... break
1
2
3
4
"""
def history_to_version_compatible_storage():
"""
Some storages work under ZODB <= 3.8 and ZODB >= 3.9.
......@@ -163,7 +139,7 @@ def history_to_version_compatible_storage():
... return oid,version,size
A ZEOStorage such as the following should support this type of storage:
>>> class OurFakeServer(FakeServer):
... storages = {'1':VersionCompatibleStorage()}
>>> import ZEO.StorageServer
......@@ -181,7 +157,7 @@ def history_to_version_compatible_storage():
>>> from ZEO.StorageServer import ZEOStorage308Adapter
>>> zeo = ZEOStorage308Adapter(VersionCompatibleStorage())
The history method should still return the parameters it was called with:
>>> zeo.history('oid','',99)
......@@ -193,4 +169,3 @@ def test_suite():
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment