Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZODB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kirill Smelkov
ZODB
Commits
3c63869c
Commit
3c63869c
authored
Feb 18, 2008
by
Jim Fulton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Removed ThreadedAsync and (last?) vestiges of the old "non-async"
mode.
parent
990ce5f8
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
16 additions
and
286 deletions
+16
-286
src/CHANGES.txt
src/CHANGES.txt
+2
-0
src/ThreadedAsync/LoopCallback.py
src/ThreadedAsync/LoopCallback.py
+0
-147
src/ThreadedAsync/__init__.py
src/ThreadedAsync/__init__.py
+0
-19
src/ZEO/ClientStorage.py
src/ZEO/ClientStorage.py
+0
-3
src/ZEO/runzeo.py
src/ZEO/runzeo.py
+2
-2
src/ZEO/tests/zeoserver.py
src/ZEO/tests/zeoserver.py
+3
-4
src/ZEO/zrpc/connection.py
src/ZEO/zrpc/connection.py
+8
-106
src/ZEO/zrpc/server.py
src/ZEO/zrpc/server.py
+1
-2
src/ZODB/scripts/zodbload.py
src/ZODB/scripts/zodbload.py
+0
-3
No files found.
src/CHANGES.txt
View file @
3c63869c
...
...
@@ -32,6 +32,8 @@ New Features
on datetimes or serials (TIDs). See
src/ZODB/historical_connections.txt.
- Removed the ThreadedAsync module.
Bugs Fixed
----------
...
...
src/ThreadedAsync/LoopCallback.py
deleted
100644 → 0
View file @
990ce5f8
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Manage the asyncore mainloop in a multi-threaded app.
In a multi-threaded application, only a single thread runs the
asyncore mainloop. This thread (the "mainloop thread") may not start
the mainloop before another thread needs to perform an async action
that requires it. As a result, other threads need to coordinate with
the mainloop thread to find out whether the mainloop is running.
This module implements a callback mechanism that allows other threads
to be notified when the mainloop starts. A thread calls
register_loop_callback() to register interest. When the mainloop
thread calls loop(), each registered callback will be called with the
socket map as its first argument.
"""
import
asyncore
import
thread
# Zope pokes a non-None value into exit_status when it wants the loop()
# function to exit. Indeed, there appears to be no other way to tell
# Zope3 to shut down.
exit_status
=
None
_loop_lock
=
thread
.
allocate_lock
()
_looping
=
None
# changes to socket map when loop() starts
_loop_callbacks
=
[]
def
register_loop_callback
(
callback
,
args
=
(),
kw
=
None
):
"""Register callback function to be called when mainloop starts.
The callable object callback will be invokved when the mainloop
starts. If the mainloop is currently running, the callback will
be invoked immediately.
The callback will be called with a single argument, the mainloop
socket map, unless the optional args or kw arguments are used.
args defines a tuple of extra arguments to pass after the socket
map. kw defines a dictionary of keyword arguments.
"""
_loop_lock
.
acquire
()
try
:
if
_looping
is
not
None
:
callback
(
_looping
,
*
args
,
**
(
kw
or
{}))
else
:
_loop_callbacks
.
append
((
callback
,
args
,
kw
))
finally
:
_loop_lock
.
release
()
def
remove_loop_callback
(
callback
):
"""Remove a callback function registered earlier.
This is useful if loop() was never called.
"""
for
i
,
value
in
enumerate
(
_loop_callbacks
):
if
value
[
0
]
==
callback
:
del
_loop_callbacks
[
i
]
return
# Because of the exit_status magic, we can't just invoke asyncore.loop(),
# and that's a shame.
# The signature of asyncore.loop changed between Python 2.3 and 2.4, and
# this loop() has 2.4's signature, which added the optional `count` argument.
# Since we physically replace asyncore.loop with this `loop`, and want
# compatibility with both Pythons, we need to support the most recent
# signature. Applications running under 2.3 should (of course) avoid using
# the `count` argument, since 2.3 doesn't have it.
def
loop
(
timeout
=
30.0
,
use_poll
=
False
,
map
=
None
,
count
=
None
):
global
_looping
global
exit_status
exit_status
=
None
if
map
is
None
:
map
=
asyncore
.
socket_map
# This section is taken from Python 2.3's asyncore.loop, and is more
# elaborate than the corresponding section of 2.4's: in 2.4 poll2 and
# poll3 are aliases for the same function, in 2.3 they're different
# functions.
if
use_poll
:
if
hasattr
(
select
,
'poll'
):
poll_fun
=
asyncore
.
poll3
else
:
poll_fun
=
asyncore
.
poll2
else
:
poll_fun
=
asyncore
.
poll
# The loop is about to start: invoke any registered callbacks.
_loop_lock
.
acquire
()
try
:
_looping
=
map
while
_loop_callbacks
:
cb
,
args
,
kw
=
_loop_callbacks
.
pop
()
cb
(
map
,
*
args
,
**
(
kw
or
{}))
finally
:
_loop_lock
.
release
()
# Run the loop. This is 2.4's logic, with the addition that we stop
# if/when this module's exit_status global is set to a non-None value.
if
count
is
None
:
while
map
and
exit_status
is
None
:
poll_fun
(
timeout
,
map
)
else
:
while
map
and
count
>
0
and
exit_status
is
None
:
poll_fun
(
timeout
,
map
)
count
-=
1
_loop_lock
.
acquire
()
try
:
_looping
=
None
finally
:
_loop_lock
.
release
()
# Evil: rebind asyncore.loop to the above loop() function.
#
# Code should explicitly call ThreadedAsync.loop() instead of asyncore.loop().
# Most of ZODB has been fixed, but ripping this out may break 3rd party code.
# Maybe we should issue a warning and let it continue for a while (NOTE: code
# to raise DeprecationWarning was written but got commented out below; don't
# know why it got commented out). Or maybe we should get rid of this
# mechanism entirely, and have each piece that needs one run its own asyncore
# loop in its own thread.
##def deprecated_loop(*args, **kws):
## import warnings
## warnings.warn("""\
##ThreadedAsync.loop() called through sneaky asyncore.loop() rebinding.
##You should change your code to call ThreadedAsync.loop() explicitly.""",
## DeprecationWarning)
## loop(*args, **kws)
##
##asyncore.loop = deprecated_loop
asyncore
.
loop
=
loop
src/ThreadedAsync/__init__.py
deleted
100644 → 0
View file @
990ce5f8
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Manage the asyncore mainloop in a multi-threaded app.
$Id$
"""
from
LoopCallback
import
register_loop_callback
,
loop
,
remove_loop_callback
src/ZEO/ClientStorage.py
View file @
3c63869c
...
...
@@ -369,7 +369,6 @@ class ClientStorage(object):
# still be going on. This code must wait until validation
# finishes, but if the connection isn't a zrpc async
# connection it also needs to poll for input.
assert
self
.
_connection
.
is_async
()
while
1
:
self
.
_ready
.
wait
(
30
)
if
self
.
_ready
.
isSet
():
...
...
@@ -524,8 +523,6 @@ class ClientStorage(object):
# handled in order.
self
.
_info
.
update
(
stub
.
get_info
())
assert
conn
.
is_async
()
self
.
_handle_extensions
()
def
_handle_extensions
(
self
):
...
...
src/ZEO/runzeo.py
View file @
3c63869c
...
...
@@ -34,6 +34,7 @@ Unless -C is specified, -a and -f are required.
# For the forseeable future, it must work under Python 2.1 as well as
# 2.2 and above.
import
asyncore
import
os
import
sys
import
signal
...
...
@@ -241,8 +242,7 @@ class ZEOServer:
auth_realm
=
self
.
options
.
auth_realm
)
def
loop_forever
(
self
):
import
ThreadedAsync.LoopCallback
ThreadedAsync
.
LoopCallback
.
loop
()
asyncore
.
loop
()
def
handle_sigterm
(
self
):
log
(
"terminated by SIGTERM"
)
...
...
src/ZEO/tests/zeoserver.py
View file @
3c63869c
...
...
@@ -13,6 +13,7 @@
##############################################################################
"""Helper file used to launch a ZEO server cross platform"""
import
asyncore
import
os
import
sys
import
time
...
...
@@ -24,8 +25,6 @@ import asyncore
import
threading
import
logging
import
ThreadedAsync.LoopCallback
from
ZEO.StorageServer
import
StorageServer
from
ZEO.runzeo
import
ZEOOptions
...
...
@@ -208,8 +207,8 @@ def main():
d
.
setDaemon
(
1
)
d
.
start
()
# Loop for socket events
log
(
label
,
'entering
ThreadedAsync
loop'
)
ThreadedAsync
.
LoopCallback
.
loop
()
log
(
label
,
'entering
asyncore
loop'
)
asyncore
.
loop
()
if
__name__
==
'__main__'
:
...
...
src/ZEO/zrpc/connection.py
View file @
3c63869c
...
...
@@ -21,7 +21,6 @@ import logging
import
traceback
,
time
import
ThreadedAsync
from
ZEO.zrpc
import
smac
from
ZEO.zrpc.error
import
ZRPCError
,
DisconnectedError
from
ZEO.zrpc.marshal
import
Marshaller
...
...
@@ -383,15 +382,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
ourmap
=
{}
self
.
__super_init
(
sock
,
addr
,
map
=
ourmap
)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
# thr_async is true, then the mainloop is running in a
# separate thread. If thr_async is true, then the asyncore
# trigger (self.trigger) is used to notify that thread of
# activity on the current thread.
self
.
thr_async
=
False
self
.
trigger
=
None
self
.
_prepare_async
()
self
.
trigger
=
trigger
()
# The singleton dict is used in synchronous mode when a method
# needs to call into asyncore to try to force some I/O to occur.
...
...
@@ -684,10 +675,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
if
self
.
closed
:
raise
DisconnectedError
()
msgid
=
self
.
send_call
(
method
,
args
,
0
)
if
self
.
is_async
():
self
.
trigger
.
pull_trigger
()
else
:
asyncore
.
poll
(
0.01
,
self
.
_singleton
)
self
.
trigger
.
pull_trigger
()
return
msgid
def
_deferred_wait
(
self
,
msgid
):
...
...
@@ -728,23 +716,6 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# handle IO, possibly in async mode
def
_prepare_async
(
self
):
self
.
thr_async
=
False
ThreadedAsync
.
register_loop_callback
(
self
.
set_async
)
# TODO: If we are not in async mode, this will cause dead
# Connections to be leaked.
def
set_async
(
self
,
map
):
self
.
trigger
=
trigger
()
self
.
thr_async
=
True
def
is_async
(
self
):
# Overridden by ManagedConnection
if
self
.
thr_async
:
return
1
else
:
return
0
def
_pull_trigger
(
self
,
tryagain
=
10
):
try
:
self
.
trigger
.
pull_trigger
()
...
...
@@ -757,10 +728,9 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def
wait
(
self
,
msgid
):
"""Invoke asyncore mainloop and wait for reply."""
if
__debug__
:
self
.
log
(
"wait(%d), async=%d"
%
(
msgid
,
self
.
is_async
()),
level
=
TRACE
)
if
self
.
is_async
():
self
.
_pull_trigger
()
self
.
log
(
"wait(%d)"
%
msgid
,
level
=
TRACE
)
self
.
_pull_trigger
()
# Delay used when we call asyncore.poll() directly.
# Start with a 1 msec delay, double until 1 sec.
...
...
@@ -778,7 +748,6 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self
.
log
(
"wait(%d): reply=%s"
%
(
msgid
,
short_repr
(
reply
)),
level
=
TRACE
)
return
reply
assert
self
.
is_async
()
# XXX we're such cowards
self
.
replies_cond
.
wait
()
finally
:
self
.
replies_cond
.
release
()
...
...
@@ -793,65 +762,9 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def
poll
(
self
):
"""Invoke asyncore mainloop to get pending message out."""
if
__debug__
:
self
.
log
(
"poll(), async=%d"
%
self
.
is_async
(),
level
=
TRACE
)
if
self
.
is_async
():
self
.
_pull_trigger
()
else
:
asyncore
.
poll
(
0.0
,
self
.
_singleton
)
def
_pending
(
self
,
timeout
=
0
):
"""Invoke mainloop until any pending messages are handled."""
if
__debug__
:
self
.
log
(
"pending(), async=%d"
%
self
.
is_async
(),
level
=
TRACE
)
if
self
.
is_async
():
return
# Inline the asyncore poll() function to know whether any input
# was actually read. Repeat until no input is ready.
# Pending does reads and writes. In the case of server
# startup, we may need to write out zeoVerify() messages.
# Always check for read status, but don't check for write status
# only there is output to do. Only continue in this loop as
# long as there is data to read.
r
=
r_in
=
[
self
.
_fileno
]
x_in
=
[]
while
r
and
not
self
.
closed
:
if
self
.
writable
():
w_in
=
[
self
.
_fileno
]
else
:
w_in
=
[]
try
:
r
,
w
,
x
=
select
.
select
(
r_in
,
w_in
,
x_in
,
timeout
)
except
select
.
error
,
err
:
if
err
[
0
]
==
errno
.
EINTR
:
timeout
=
0
continue
else
:
raise
else
:
# Make sure any subsequent select does not block. The
# loop is only intended to make sure all incoming data is
# returned.
# Insecurity: What if the server sends a lot of
# invalidations, such that pending never finishes? Seems
# unlikely, but possible.
timeout
=
0
if
r
:
try
:
self
.
handle_read_event
()
except
asyncore
.
ExitNow
:
raise
except
:
self
.
handle_error
()
if
w
:
try
:
self
.
handle_write_event
()
except
asyncore
.
ExitNow
:
raise
except
:
self
.
handle_error
()
self
.
log
(
"poll()"
,
level
=
TRACE
)
self
.
_pull_trigger
()
class
ManagedServerConnection
(
Connection
):
"""Server-side Connection subclass."""
__super_init
=
Connection
.
__init__
...
...
@@ -895,7 +808,6 @@ class ManagedClientConnection(Connection):
self
.
queued_messages
=
[]
self
.
__super_init
(
sock
,
addr
,
obj
,
tag
=
'C'
,
map
=
client_map
)
self
.
thr_async
=
True
self
.
trigger
=
client_trigger
client_trigger
.
pull_trigger
()
...
...
@@ -951,16 +863,6 @@ class ManagedClientConnection(Connection):
# we're closed.
self
.
trigger
.
pull_trigger
()
def
set_async
(
self
,
map
):
pass
def
_prepare_async
(
self
):
# Don't do the register_loop_callback that the superclass does
pass
def
is_async
(
self
):
return
True
def
close
(
self
):
self
.
mgr
.
close_conn
(
self
)
self
.
__super_close
()
src/ZEO/zrpc/server.py
View file @
3c63869c
...
...
@@ -18,10 +18,9 @@ import types
from
ZEO.zrpc.connection
import
Connection
from
ZEO.zrpc.log
import
log
import
logging
import
ThreadedAsync.LoopCallback
# Export the main asyncore loop
loop
=
ThreadedAsync
.
LoopCallback
.
loop
loop
=
asyncore
.
loop
class
Dispatcher
(
asyncore
.
dispatcher
):
"""A server that accepts incoming RPC connections"""
...
...
src/ZODB/scripts/zodbload.py
View file @
3c63869c
...
...
@@ -790,9 +790,6 @@ def main(args=None):
import
Zope2
Zope2
.
startup
()
#from ThreadedAsync.LoopCallback import loop
#threading.Thread(target=loop, args=(), name='asyncore').start()
jobs
=
JobProducer
()
for
job
,
kw
,
frequency
,
sleep
,
repeatp
in
jobdefs
:
Job
=
globals
()[
job
.
capitalize
()
+
'Job'
]
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment