Commit 84eed196 authored by Jim Fulton's avatar Jim Fulton

ZEO clients (``ClientStorage`` instances) now work in forked processes,

including those created via ``multiprocessing.Process`` instances.

This entailed giving each client storage it's own networking thread.
parent 6ba3a6bf
...@@ -18,6 +18,9 @@ New Features ...@@ -18,6 +18,9 @@ New Features
raise a StorageTransactionError when invalid transactions are passed raise a StorageTransactionError when invalid transactions are passed
to tpc_begin, tpc_vote, or tpc_finish. to tpc_begin, tpc_vote, or tpc_finish.
- ZEO clients (``ClientStorage`` instances) now work in forked processes,
including those created via ``multiprocessing.Process`` instances.
- Broken objects now provide the IBroken interface. - Broken objects now provide the IBroken interface.
Bugs Fixed Bugs Fixed
......
...@@ -52,8 +52,6 @@ import zope.testing.setupstack ...@@ -52,8 +52,6 @@ import zope.testing.setupstack
logger = logging.getLogger('ZEO.tests.testZEO') logger = logging.getLogger('ZEO.tests.testZEO')
ZEO.zrpc.connection.start_client_thread()
class DummyDB: class DummyDB:
def invalidate(self, *args): def invalidate(self, *args):
pass pass
...@@ -389,14 +387,17 @@ class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): ...@@ -389,14 +387,17 @@ class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
def setUp(self): def setUp(self):
# Crank down the select frequency # Crank down the select frequency
self.__old_client_timeout = ZEO.zrpc.connection.client_timeout self.__old_client_timeout = ZEO.zrpc.client.client_timeout
ZEO.zrpc.connection.client_timeout = 0.1 ZEO.zrpc.client.client_timeout = self.__client_timeout
ZEO.zrpc.connection.client_trigger.pull_trigger()
ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self) ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self)
__client_timeouts = 0
def __client_timeout(self):
self.__client_timeouts += 1
return .1
def tearDown(self): def tearDown(self):
ZEO.zrpc.connection.client_timeout = self.__old_client_timeout ZEO.zrpc.client.client_timeout = self.__old_client_timeout
ZEO.zrpc.connection.client_trigger.pull_trigger()
ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self) ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self)
def getConfig(self, path, create, read_only): def getConfig(self, path, create, read_only):
...@@ -405,11 +406,11 @@ class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): ...@@ -405,11 +406,11 @@ class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
def checkHeartbeatWithServerClose(self): def checkHeartbeatWithServerClose(self):
# This is a minimal test that mainly tests that the heartbeat # This is a minimal test that mainly tests that the heartbeat
# function does no harm. # function does no harm.
client_timeout_count = ZEO.zrpc.connection.client_timeout_count
self._storage = self.openClientStorage() self._storage = self.openClientStorage()
time.sleep(1) # allow some time for the select loop to fire a few times client_timeouts = self.__client_timeouts
self.assert_(ZEO.zrpc.connection.client_timeout_count forker.wait_until('got a timeout',
> client_timeout_count) lambda : self.__client_timeouts > client_timeouts
)
self._dostore() self._dostore()
if hasattr(os, 'kill'): if hasattr(os, 'kill'):
...@@ -419,23 +420,10 @@ class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): ...@@ -419,23 +420,10 @@ class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
else: else:
self.shutdownServer() self.shutdownServer()
for i in range(91): forker.wait_until('disconnected',
# wait for disconnection lambda : not self._storage.is_connected()
if not self._storage.is_connected(): )
break
time.sleep(0.1)
else:
raise AssertionError("Didn't detect server shutdown in 5 seconds")
def checkHeartbeatWithClientClose(self):
# This is a minimal test that mainly tests that the heartbeat
# function does no harm.
client_timeout_count = ZEO.zrpc.connection.client_timeout_count
self._storage = self.openClientStorage()
self._storage.close() self._storage.close()
time.sleep(1) # allow some time for the select loop to fire a few times
self.assert_(ZEO.zrpc.connection.client_timeout_count
> client_timeout_count)
class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
...@@ -451,26 +439,26 @@ class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): ...@@ -451,26 +439,26 @@ class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
def writable(self): def writable(self):
raise SystemError("I'm evil") raise SystemError("I'm evil")
log = [] import zope.testing.loggingsupport
ZEO.zrpc.connection.client_logger.critical = ( handler = zope.testing.loggingsupport.InstalledHandler(
lambda m, *a, **kw: log.append((m % a, kw)) 'ZEO.zrpc.client')
)
ZEO.zrpc.connection.client_map[None] = Evil() self._storage._rpc_mgr.map[None] = Evil()
try: try:
ZEO.zrpc.connection.client_trigger.pull_trigger() self._storage._rpc_mgr.trigger.pull_trigger()
except DisconnectedError: except DisconnectedError:
pass pass
time.sleep(.1) forker.wait_until(
self.failIf(self._storage.is_connected()) 'disconnected',
self.assertEqual(len(ZEO.zrpc.connection.client_map), 1) lambda : not self._storage.is_connected()
del ZEO.zrpc.connection.client_logger.critical )
self.assertEqual(log[0][0], 'The ZEO client loop failed.')
self.assert_('exc_info' in log[0][1]) log = str(handler)
self.assertEqual(log[1][0], "Couldn't close a dispatcher.") handler.uninstall()
self.assert_('exc_info' in log[1][1]) self.assert_("ZEO client loop failed" in log)
self.assert_("Couldn't close a dispatcher." in log)
def checkExceptionLogsAtError(self): def checkExceptionLogsAtError(self):
# Test the exceptions are logged at error # Test the exceptions are logged at error
...@@ -1201,9 +1189,12 @@ def open_convenience(): ...@@ -1201,9 +1189,12 @@ def open_convenience():
def client_asyncore_thread_has_name(): def client_asyncore_thread_has_name():
""" """
>>> addr, _ = start_server()
>>> db = ZEO.DB(addr)
>>> len([t for t in threading.enumerate() >>> len([t for t in threading.enumerate()
... if t.getName() == 'ZEO.zrpc.connection']) ... if ' zeo client networking thread' in t.getName()])
1 1
>>> db.close()
""" """
def runzeo_without_configfile(): def runzeo_without_configfile():
...@@ -1260,6 +1251,37 @@ Invalidations could cause errors when closing client storages, ...@@ -1260,6 +1251,37 @@ Invalidations could cause errors when closing client storages,
>>> thread.join(1) >>> thread.join(1)
""" """
if sys.version_info >= (2, 6):
import multiprocessing
def work_with_multiprocessing_process(name, addr, q):
conn = ZEO.connection(addr)
q.put((name, conn.root.x))
conn.close()
def work_with_multiprocessing():
"""Client storage should work with multi-processing.
>>> import StringIO
>>> sys.stdin = StringIO.StringIO()
>>> addr, _ = start_server()
>>> conn = ZEO.connection(addr)
>>> conn.root.x = 1
>>> transaction.commit()
>>> q = multiprocessing.Queue()
>>> processes = [multiprocessing.Process(
... target=work_with_multiprocessing_process,
... args=(i, addr, q))
... for i in range(3)]
>>> _ = [p.start() for p in processes]
>>> sorted(q.get(timeout=60) for p in processes)
[(0, 1), (1, 1), (2, 1)]
>>> _ = [p.join(30) for p in processes]
>>> conn.close()
"""
slow_test_classes = [ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests, BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests, DemoStorageTests, FileStorageTests, MappingStorageTests,
......
...@@ -11,11 +11,15 @@ ...@@ -11,11 +11,15 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
import asyncore
import errno import errno
import logging
import select import select
import socket import socket
import sys
import threading import threading
import time import time
import traceback
import types import types
import logging import logging
...@@ -24,15 +28,113 @@ from ZODB.loglevels import BLATHER ...@@ -24,15 +28,113 @@ from ZODB.loglevels import BLATHER
from ZEO.zrpc.log import log from ZEO.zrpc.log import log
import ZEO.zrpc.trigger import ZEO.zrpc.trigger
from ZEO.zrpc.connection import ManagedClientConnection, start_client_thread from ZEO.zrpc.connection import ManagedClientConnection
def client_timeout():
return 30.0
def client_loop(map):
read = asyncore.read
write = asyncore.write
_exception = asyncore._exception
while map:
try:
# The next two lines intentionally don't use
# iterators. Other threads can close dispatchers, causeing
# the socket map to shrink.
r = e = map.keys()
w = [fd for (fd, obj) in map.items() if obj.writable()]
try:
r, w, e = select.select(r, w, e, client_timeout())
except select.error, err:
if err[0] != errno.EINTR:
if err[0] == errno.EBADF:
# If a connection is closed while we are
# calling select on it, we can get a bad
# file-descriptor error. We'll check for this
# case by looking for entries in r and w that
# are not in the socket map.
if [fd for fd in r if fd not in map]:
continue
if [fd for fd in w if fd not in map]:
continue
raise
else:
continue
if not map:
break
if not (r or w or e):
# The line intentionally doesn't use iterators. Other
# threads can close dispatchers, causeing the socket
# map to shrink.
for obj in map.values():
if isinstance(obj, ManagedClientConnection):
# Send a heartbeat message as a reply to a
# non-existent message id.
try:
obj.send_reply(-1, None)
except DisconnectedError:
pass
continue
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
except:
if map:
try:
logging.getLogger(__name__+'.client_loop').critical(
'A ZEO client loop failed.',
exc_info=sys.exc_info())
except:
pass
for fd, obj in map.items():
if not hasattr(obj, 'mgr'):
continue
try:
obj.mgr.client.close()
except:
map.pop(fd, None)
try:
logging.getLogger(__name__+'.client_loop'
).critical(
"Couldn't close a dispatcher.",
exc_info=sys.exc_info())
except:
pass
class ConnectionManager(object): class ConnectionManager(object):
"""Keeps a connection up over time""" """Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180): def __init__(self, addrs, client, tmin=1, tmax=180):
start_client_thread()
self.addrlist = self._parse_addrs(addrs)
self.client = client self.client = client
self._start_asyncore_loop()
self.addrlist = self._parse_addrs(addrs)
self.tmin = min(tmin, tmax) self.tmin = min(tmin, tmax)
self.tmax = tmax self.tmax = tmax
self.cond = threading.Condition(threading.Lock()) self.cond = threading.Condition(threading.Lock())
...@@ -42,6 +144,15 @@ class ConnectionManager(object): ...@@ -42,6 +144,15 @@ class ConnectionManager(object):
# attempting to connect. # attempting to connect.
self.thread = None # Protected by self.cond self.thread = None # Protected by self.cond
def _start_asyncore_loop(self):
self.map = {}
self.trigger = ZEO.zrpc.trigger.trigger(self.map)
self.loop_thread = threading.Thread(
name="%s zeo client networking thread" % self.client.__name__,
target=client_loop, args=(self.map,))
self.loop_thread.setDaemon(True)
self.loop_thread.start()
def __repr__(self): def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addrlist) return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
...@@ -84,7 +195,6 @@ class ConnectionManager(object): ...@@ -84,7 +195,6 @@ class ConnectionManager(object):
try: try:
t = self.thread t = self.thread
self.thread = None self.thread = None
conn = self.connection
finally: finally:
self.cond.release() self.cond.release()
if t is not None: if t is not None:
...@@ -94,9 +204,21 @@ class ConnectionManager(object): ...@@ -94,9 +204,21 @@ class ConnectionManager(object):
if t.isAlive(): if t.isAlive():
log("CM.close(): self.thread.join() timed out", log("CM.close(): self.thread.join() timed out",
level=logging.WARNING) level=logging.WARNING)
if conn is not None:
# This will call close_conn() below which clears self.connection for fd, obj in self.map.items():
conn.close() if obj is not self.trigger:
try:
obj.close()
except:
logging.getLogger(__name__+'.'+self.__class__.__name__
).critical(
"Couldn't close a dispatcher.",
exc_info=sys.exc_info())
self.map.clear()
self.trigger.pull_trigger()
self.loop_thread.join(9)
self.trigger.close()
def attempt_connect(self): def attempt_connect(self):
"""Attempt a connection to the server without blocking too long. """Attempt a connection to the server without blocking too long.
......
...@@ -21,10 +21,11 @@ import logging ...@@ -21,10 +21,11 @@ import logging
import traceback, time import traceback, time
import ZEO.zrpc.trigger
from ZEO.zrpc import smac from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.marshal import Marshaller, ServerMarshaller from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.log import short_repr, log from ZEO.zrpc.log import short_repr, log
from ZODB.loglevels import BLATHER, TRACE from ZODB.loglevels import BLATHER, TRACE
import ZODB.POSException import ZODB.POSException
...@@ -35,142 +36,6 @@ exception_type_type = type(Exception) ...@@ -35,142 +36,6 @@ exception_type_type = type(Exception)
debug_zrpc = False debug_zrpc = False
##############################################################################
# Dedicated Client select loop:
client_timeout = 30.0
client_timeout_count = 0 # for testing
client_map = {}
client_trigger = trigger(client_map)
client_logger = logging.getLogger('ZEO.zrpc.client_loop')
client_exit_event = threading.Event()
client_running = False
def client_exit():
global client_running
if client_running:
client_running = False
client_trigger.pull_trigger()
client_exit_event.wait(99)
atexit.register(client_exit)
def client_loop():
global client_running
client_running = True
client_exit_event.clear()
map = client_map
read = asyncore.read
write = asyncore.write
_exception = asyncore._exception
loop_failures = 0
while client_running and map:
try:
# The next two lines intentionally don't use
# iterators. Other threads can close dispatchers, causeing
# the socket map to shrink.
r = e = client_map.keys()
w = [fd for (fd, obj) in map.items() if obj.writable()]
try:
r, w, e = select.select(r, w, e, client_timeout)
except select.error, err:
if err[0] != errno.EINTR:
if err[0] == errno.EBADF:
# If a connection is closed while we are
# calling select on it, we can get a bad
# file-descriptor error. We'll check for this
# case by looking for entries in r and w that
# are not in the socket map.
if [fd for fd in r if fd not in map]:
continue
if [fd for fd in w if fd not in map]:
continue
raise
else:
continue
if not client_running:
break
if not (r or w or e):
# The line intentionally doesn't use iterators. Other
# threads can close dispatchers, causeing the socket
# map to shrink.
for obj in map.values():
if isinstance(obj, Connection):
# Send a heartbeat message as a reply to a
# non-existent message id.
try:
obj.send_reply(-1, None)
except DisconnectedError:
pass
global client_timeout_count
client_timeout_count += 1
continue
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
except:
if map:
try:
client_logger.critical('The ZEO client loop failed.',
exc_info=sys.exc_info())
except:
pass
for fd, obj in map.items():
if obj is client_trigger:
continue
try:
obj.mgr.client.close()
except:
map.pop(fd, None)
try:
client_logger.critical(
"Couldn't close a dispatcher.",
exc_info=sys.exc_info())
except:
pass
client_exit_event.set()
client_thread_lock = threading.Lock()
client_thread = None
def start_client_thread():
client_thread_lock.acquire()
try:
global client_thread
if client_thread is None:
client_thread = threading.Thread(target=client_loop, name=__name__)
client_thread.setDaemon(True)
client_thread.start()
finally:
client_thread_lock.release()
#
##############################################################################
class Delay: class Delay:
"""Used to delay response to client for synchronous calls. """Used to delay response to client for synchronous calls.
...@@ -679,7 +544,7 @@ class ManagedServerConnection(Connection): ...@@ -679,7 +544,7 @@ class ManagedServerConnection(Connection):
unlogged_exception_types = (ZODB.POSException.POSKeyError, ) unlogged_exception_types = (ZODB.POSException.POSKeyError, )
# Servers use a shared server trigger that uses the asyncore socket map # Servers use a shared server trigger that uses the asyncore socket map
trigger = trigger() trigger = ZEO.zrpc.trigger.trigger()
call_from_thread = trigger.pull_trigger call_from_thread = trigger.pull_trigger
def __init__(self, sock, addr, obj, mgr): def __init__(self, sock, addr, obj, mgr):
...@@ -724,9 +589,6 @@ class ManagedClientConnection(Connection): ...@@ -724,9 +589,6 @@ class ManagedClientConnection(Connection):
__super_init = Connection.__init__ __super_init = Connection.__init__
base_message_output = Connection.message_output base_message_output = Connection.message_output
trigger = client_trigger
call_from_thread = trigger.pull_trigger
def __init__(self, sock, addr, mgr): def __init__(self, sock, addr, mgr):
self.mgr = mgr self.mgr = mgr
...@@ -753,8 +615,10 @@ class ManagedClientConnection(Connection): ...@@ -753,8 +615,10 @@ class ManagedClientConnection(Connection):
self.replies_cond = threading.Condition() self.replies_cond = threading.Condition()
self.replies = {} self.replies = {}
self.__super_init(sock, addr, None, tag='C', map=client_map) self.__super_init(sock, addr, None, tag='C', map=mgr.map)
client_trigger.pull_trigger() self.trigger = mgr.trigger
self.call_from_thread = self.trigger.pull_trigger
self.call_from_thread()
def close(self): def close(self):
Connection.close(self) Connection.close(self)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment