Commit 1e6c884c authored by Jim Fulton's avatar Jim Fulton

Removed ThreadedAsync and (last?) vestiges of the old "non-async"

mode.
parent b7d7570b
......@@ -32,6 +32,8 @@ New Features
on datetimes or serials (TIDs). See
src/ZODB/historical_connections.txt.
- Removed the ThreadedAsync module.
Bugs Fixed
----------
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Manage the asyncore mainloop in a multi-threaded app.
In a multi-threaded application, only a single thread runs the
asyncore mainloop. This thread (the "mainloop thread") may not start
the mainloop before another thread needs to perform an async action
that requires it. As a result, other threads need to coordinate with
the mainloop thread to find out whether the mainloop is running.
This module implements a callback mechanism that allows other threads
to be notified when the mainloop starts. A thread calls
register_loop_callback() to register interest. When the mainloop
thread calls loop(), each registered callback will be called with the
socket map as its first argument.
"""
import asyncore
import thread
# Zope pokes a non-None value into exit_status when it wants the loop()
# function to exit. Indeed, there appears to be no other way to tell
# Zope3 to shut down.
exit_status = None
_loop_lock = thread.allocate_lock()
_looping = None # changes to socket map when loop() starts
_loop_callbacks = []
def register_loop_callback(callback, args=(), kw=None):
"""Register callback function to be called when mainloop starts.
The callable object callback will be invokved when the mainloop
starts. If the mainloop is currently running, the callback will
be invoked immediately.
The callback will be called with a single argument, the mainloop
socket map, unless the optional args or kw arguments are used.
args defines a tuple of extra arguments to pass after the socket
map. kw defines a dictionary of keyword arguments.
"""
_loop_lock.acquire()
try:
if _looping is not None:
callback(_looping, *args, **(kw or {}))
else:
_loop_callbacks.append((callback, args, kw))
finally:
_loop_lock.release()
def remove_loop_callback(callback):
"""Remove a callback function registered earlier.
This is useful if loop() was never called.
"""
for i, value in enumerate(_loop_callbacks):
if value[0] == callback:
del _loop_callbacks[i]
return
# Because of the exit_status magic, we can't just invoke asyncore.loop(),
# and that's a shame.
# The signature of asyncore.loop changed between Python 2.3 and 2.4, and
# this loop() has 2.4's signature, which added the optional `count` argument.
# Since we physically replace asyncore.loop with this `loop`, and want
# compatibility with both Pythons, we need to support the most recent
# signature. Applications running under 2.3 should (of course) avoid using
# the `count` argument, since 2.3 doesn't have it.
def loop(timeout=30.0, use_poll=False, map=None, count=None):
global _looping
global exit_status
exit_status = None
if map is None:
map = asyncore.socket_map
# This section is taken from Python 2.3's asyncore.loop, and is more
# elaborate than the corresponding section of 2.4's: in 2.4 poll2 and
# poll3 are aliases for the same function, in 2.3 they're different
# functions.
if use_poll:
if hasattr(select, 'poll'):
poll_fun = asyncore.poll3
else:
poll_fun = asyncore.poll2
else:
poll_fun = asyncore.poll
# The loop is about to start: invoke any registered callbacks.
_loop_lock.acquire()
try:
_looping = map
while _loop_callbacks:
cb, args, kw = _loop_callbacks.pop()
cb(map, *args, **(kw or {}))
finally:
_loop_lock.release()
# Run the loop. This is 2.4's logic, with the addition that we stop
# if/when this module's exit_status global is set to a non-None value.
if count is None:
while map and exit_status is None:
poll_fun(timeout, map)
else:
while map and count > 0 and exit_status is None:
poll_fun(timeout, map)
count -= 1
_loop_lock.acquire()
try:
_looping = None
finally:
_loop_lock.release()
# Evil: rebind asyncore.loop to the above loop() function.
#
# Code should explicitly call ThreadedAsync.loop() instead of asyncore.loop().
# Most of ZODB has been fixed, but ripping this out may break 3rd party code.
# Maybe we should issue a warning and let it continue for a while (NOTE: code
# to raise DeprecationWarning was written but got commented out below; don't
# know why it got commented out). Or maybe we should get rid of this
# mechanism entirely, and have each piece that needs one run its own asyncore
# loop in its own thread.
##def deprecated_loop(*args, **kws):
## import warnings
## warnings.warn("""\
##ThreadedAsync.loop() called through sneaky asyncore.loop() rebinding.
##You should change your code to call ThreadedAsync.loop() explicitly.""",
## DeprecationWarning)
## loop(*args, **kws)
##
##asyncore.loop = deprecated_loop
asyncore.loop = loop
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Manage the asyncore mainloop in a multi-threaded app.
$Id$
"""
from LoopCallback import register_loop_callback, loop, remove_loop_callback
......@@ -369,7 +369,6 @@ class ClientStorage(object):
# still be going on. This code must wait until validation
# finishes, but if the connection isn't a zrpc async
# connection it also needs to poll for input.
assert self._connection.is_async()
while 1:
self._ready.wait(30)
if self._ready.isSet():
......@@ -524,8 +523,6 @@ class ClientStorage(object):
# handled in order.
self._info.update(stub.get_info())
assert conn.is_async()
self._handle_extensions()
def _handle_extensions(self):
......
......@@ -34,6 +34,7 @@ Unless -C is specified, -a and -f are required.
# For the forseeable future, it must work under Python 2.1 as well as
# 2.2 and above.
import asyncore
import os
import sys
import signal
......@@ -241,8 +242,7 @@ class ZEOServer:
auth_realm=self.options.auth_realm)
def loop_forever(self):
import ThreadedAsync.LoopCallback
ThreadedAsync.LoopCallback.loop()
asyncore.loop()
def handle_sigterm(self):
log("terminated by SIGTERM")
......
......@@ -13,6 +13,7 @@
##############################################################################
"""Helper file used to launch a ZEO server cross platform"""
import asyncore
import os
import sys
import time
......@@ -24,8 +25,6 @@ import asyncore
import threading
import logging
import ThreadedAsync.LoopCallback
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
......@@ -208,8 +207,8 @@ def main():
d.setDaemon(1)
d.start()
# Loop for socket events
log(label, 'entering ThreadedAsync loop')
ThreadedAsync.LoopCallback.loop()
log(label, 'entering asyncore loop')
asyncore.loop()
if __name__ == '__main__':
......
......@@ -21,7 +21,6 @@ import logging
import traceback, time
import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.marshal import Marshaller
......@@ -383,15 +382,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
ourmap = {}
self.__super_init(sock, addr, map=ourmap)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
# thr_async is true, then the mainloop is running in a
# separate thread. If thr_async is true, then the asyncore
# trigger (self.trigger) is used to notify that thread of
# activity on the current thread.
self.thr_async = False
self.trigger = None
self._prepare_async()
self.trigger = trigger()
# The singleton dict is used in synchronous mode when a method
# needs to call into asyncore to try to force some I/O to occur.
......@@ -684,10 +675,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
if self.closed:
raise DisconnectedError()
msgid = self.send_call(method, args, 0)
if self.is_async():
self.trigger.pull_trigger()
else:
asyncore.poll(0.01, self._singleton)
self.trigger.pull_trigger()
return msgid
def _deferred_wait(self, msgid):
......@@ -728,23 +716,6 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# handle IO, possibly in async mode
def _prepare_async(self):
self.thr_async = False
ThreadedAsync.register_loop_callback(self.set_async)
# TODO: If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
self.trigger = trigger()
self.thr_async = True
def is_async(self):
# Overridden by ManagedConnection
if self.thr_async:
return 1
else:
return 0
def _pull_trigger(self, tryagain=10):
try:
self.trigger.pull_trigger()
......@@ -757,10 +728,9 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def wait(self, msgid):
"""Invoke asyncore mainloop and wait for reply."""
if __debug__:
self.log("wait(%d), async=%d" % (msgid, self.is_async()),
level=TRACE)
if self.is_async():
self._pull_trigger()
self.log("wait(%d)" % msgid, level=TRACE)
self._pull_trigger()
# Delay used when we call asyncore.poll() directly.
# Start with a 1 msec delay, double until 1 sec.
......@@ -778,7 +748,6 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.log("wait(%d): reply=%s" %
(msgid, short_repr(reply)), level=TRACE)
return reply
assert self.is_async() # XXX we're such cowards
self.replies_cond.wait()
finally:
self.replies_cond.release()
......@@ -793,65 +762,9 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def poll(self):
"""Invoke asyncore mainloop to get pending message out."""
if __debug__:
self.log("poll(), async=%d" % self.is_async(), level=TRACE)
if self.is_async():
self._pull_trigger()
else:
asyncore.poll(0.0, self._singleton)
def _pending(self, timeout=0):
"""Invoke mainloop until any pending messages are handled."""
if __debug__:
self.log("pending(), async=%d" % self.is_async(), level=TRACE)
if self.is_async():
return
# Inline the asyncore poll() function to know whether any input
# was actually read. Repeat until no input is ready.
# Pending does reads and writes. In the case of server
# startup, we may need to write out zeoVerify() messages.
# Always check for read status, but don't check for write status
# only there is output to do. Only continue in this loop as
# long as there is data to read.
r = r_in = [self._fileno]
x_in = []
while r and not self.closed:
if self.writable():
w_in = [self._fileno]
else:
w_in = []
try:
r, w, x = select.select(r_in, w_in, x_in, timeout)
except select.error, err:
if err[0] == errno.EINTR:
timeout = 0
continue
else:
raise
else:
# Make sure any subsequent select does not block. The
# loop is only intended to make sure all incoming data is
# returned.
# Insecurity: What if the server sends a lot of
# invalidations, such that pending never finishes? Seems
# unlikely, but possible.
timeout = 0
if r:
try:
self.handle_read_event()
except asyncore.ExitNow:
raise
except:
self.handle_error()
if w:
try:
self.handle_write_event()
except asyncore.ExitNow:
raise
except:
self.handle_error()
self.log("poll()", level=TRACE)
self._pull_trigger()
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
__super_init = Connection.__init__
......@@ -895,7 +808,6 @@ class ManagedClientConnection(Connection):
self.queued_messages = []
self.__super_init(sock, addr, obj, tag='C', map=client_map)
self.thr_async = True
self.trigger = client_trigger
client_trigger.pull_trigger()
......@@ -951,16 +863,6 @@ class ManagedClientConnection(Connection):
# we're closed.
self.trigger.pull_trigger()
def set_async(self, map):
pass
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def is_async(self):
return True
def close(self):
self.mgr.close_conn(self)
self.__super_close()
......@@ -18,10 +18,9 @@ import types
from ZEO.zrpc.connection import Connection
from ZEO.zrpc.log import log
import logging
import ThreadedAsync.LoopCallback
# Export the main asyncore loop
loop = ThreadedAsync.LoopCallback.loop
loop = asyncore.loop
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
......
......@@ -790,9 +790,6 @@ def main(args=None):
import Zope2
Zope2.startup()
#from ThreadedAsync.LoopCallback import loop
#threading.Thread(target=loop, args=(), name='asyncore').start()
jobs = JobProducer()
for job, kw, frequency, sleep, repeatp in jobdefs:
Job = globals()[job.capitalize()+'Job']
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment