Commit e0064795 authored by Amos Latteier's avatar Amos Latteier

Updated to a newer version of select_trigger.py It seems to work OK with win32...

Updated to a newer version of select_trigger.py It seems to work OK with win32 now that Jim changed the way ZServer shutsdown medusa. Also this new select trigger should resolve the sort of permissions problems reported by Alex Rice since it uses a pipe, not a socket file in the /tmp directory.
parent cbbac009
# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = "$Id: select_trigger.py,v 1.4 1999/06/15 18:50:16 amos Exp $"
import asyncore
import asynchat
......@@ -7,108 +9,170 @@ import os
import socket
import string
import thread
class trigger (asyncore.dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
# Although SOCK_DGRAM => UDP => unreliable, this should not be
# the case for AF_UNIX sockets, or for sockets bound to a
# loopback interface.
if os.name == 'posix':
addr = '/tmp/.select-trigger.%d' % os.getpid()
family = socket.AF_UNIX
else:
addr = ('127.4.4.4', 44444)
family = socket.AF_INET
# non-blocking socket, read from by medusa
self.create_socket (family, socket.SOCK_DGRAM)
# 'blocking' socket, written to by child threads.
self.trigger = socket.socket (family, socket.SOCK_DGRAM)
self.bind (addr)
self.lock = thread.allocate_lock()
self.thunks = []
def readable (self):
return 1
def writable (self):
return 0
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
r, w = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def pull_trigger (self, thunk=None):
if thunk:
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
os.write (self.trigger, 'x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
self.thunks.append (thunk)
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
msg = str(id(thunk))
else:
msg = 'Bang!'
self.trigger.sendto (msg, self.addr)
def handle_connect(self): pass
def handle_read (self):
what, where = self.recvfrom (128)
if what != 'Bang!':
thunk_id = int (what)
for thunk in self.thunks:
if id(thunk) == thunk_id:
self.lock.release()
else:
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# tricky: get a pair of connected sockets
a.bind (self.address)
a.listen (1)
w.setblocking (0)
try:
w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking (1)
self.trigger = w
asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
self.trigger.send ('x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
self.lock.acquire()
self.thunks.remove(thunk)
finally:
self.lock.release()
thunk()
thunk_id = 0
if thunk_id:
print 'Warning: Lost Thunk! (id=%x)' % thunk_id
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
the_trigger = None
class trigger_file:
"A 'triggered' file object"
def __init__ (self, trigger, parent):
self.trigger = trigger
buffer_size = 4096
def __init__ (self, parent):
global the_trigger
if the_trigger is None:
the_trigger = trigger()
self.parent = parent
self.buffer = ''
def write (self, data):
self.trigger.pull_trigger (
lambda d=data,s=self: s.parent.push (d)
)
self.buffer = self.buffer + data
if len(self.buffer) > self.buffer_size:
d, self.buffer = self.buffer, ''
the_trigger.pull_trigger (
lambda d=d,p=self.parent: p.push (d)
)
def writeline (self, line):
self.write (line+'\r\n')
......@@ -122,22 +186,31 @@ class trigger_file:
)
def flush (self):
pass
if self.buffer:
d, self.buffer = self.buffer, ''
the_trigger.pull_trigger (
lambda p=self.parent,d=d: p.push (d)
)
def softspace (self, *args):
pass
def close (self):
# You might have the do a self.parent.push (None)
# in a derived class.
pass
# in a derived class, you may want to call trigger_close() instead.
self.flush()
self.parent = None
def trigger_close (self):
d, self.buffer = self.buffer, ''
p, self.parent = self.parent, None
the_trigger.pull_trigger (
lambda p=p,d=d: (p.push(d), p.close_when_done())
)
if __name__ == '__main__':
import time
the_trigger = trigger()
def thread_function (output_file, i, n):
print 'entering thread_function'
while n:
......@@ -162,8 +235,12 @@ if __name__ == '__main__':
def found_terminator (self):
data, self.buffer = self.buffer, ''
if not data:
asyncore.close_all()
print "done"
return
n = string.atoi (string.split (data)[0])
tf = trigger_file (the_trigger, self)
tf = trigger_file (self)
self.count = self.count + 1
thread.start_new_thread (thread_function, (tf, self.count, n))
......@@ -182,4 +259,7 @@ if __name__ == '__main__':
thread_server()
#asyncore.loop(1.0, use_poll=1)
asyncore.loop ()
try:
asyncore.loop ()
except:
asyncore.close_all()
# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = "$Id: select_trigger.py,v 1.4 1999/06/15 18:50:16 amos Exp $"
import asyncore
import asynchat
......@@ -7,108 +9,170 @@ import os
import socket
import string
import thread
class trigger (asyncore.dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
# Although SOCK_DGRAM => UDP => unreliable, this should not be
# the case for AF_UNIX sockets, or for sockets bound to a
# loopback interface.
if os.name == 'posix':
addr = '/tmp/.select-trigger.%d' % os.getpid()
family = socket.AF_UNIX
else:
addr = ('127.4.4.4', 44444)
family = socket.AF_INET
# non-blocking socket, read from by medusa
self.create_socket (family, socket.SOCK_DGRAM)
# 'blocking' socket, written to by child threads.
self.trigger = socket.socket (family, socket.SOCK_DGRAM)
self.bind (addr)
self.lock = thread.allocate_lock()
self.thunks = []
def readable (self):
return 1
def writable (self):
return 0
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
r, w = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def pull_trigger (self, thunk=None):
if thunk:
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
os.write (self.trigger, 'x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
self.thunks.append (thunk)
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
msg = str(id(thunk))
else:
msg = 'Bang!'
self.trigger.sendto (msg, self.addr)
def handle_connect(self): pass
def handle_read (self):
what, where = self.recvfrom (128)
if what != 'Bang!':
thunk_id = int (what)
for thunk in self.thunks:
if id(thunk) == thunk_id:
self.lock.release()
else:
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# tricky: get a pair of connected sockets
a.bind (self.address)
a.listen (1)
w.setblocking (0)
try:
w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking (1)
self.trigger = w
asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
self.trigger.send ('x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
self.lock.acquire()
self.thunks.remove(thunk)
finally:
self.lock.release()
thunk()
thunk_id = 0
if thunk_id:
print 'Warning: Lost Thunk! (id=%x)' % thunk_id
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
the_trigger = None
class trigger_file:
"A 'triggered' file object"
def __init__ (self, trigger, parent):
self.trigger = trigger
buffer_size = 4096
def __init__ (self, parent):
global the_trigger
if the_trigger is None:
the_trigger = trigger()
self.parent = parent
self.buffer = ''
def write (self, data):
self.trigger.pull_trigger (
lambda d=data,s=self: s.parent.push (d)
)
self.buffer = self.buffer + data
if len(self.buffer) > self.buffer_size:
d, self.buffer = self.buffer, ''
the_trigger.pull_trigger (
lambda d=d,p=self.parent: p.push (d)
)
def writeline (self, line):
self.write (line+'\r\n')
......@@ -122,22 +186,31 @@ class trigger_file:
)
def flush (self):
pass
if self.buffer:
d, self.buffer = self.buffer, ''
the_trigger.pull_trigger (
lambda p=self.parent,d=d: p.push (d)
)
def softspace (self, *args):
pass
def close (self):
# You might have the do a self.parent.push (None)
# in a derived class.
pass
# in a derived class, you may want to call trigger_close() instead.
self.flush()
self.parent = None
def trigger_close (self):
d, self.buffer = self.buffer, ''
p, self.parent = self.parent, None
the_trigger.pull_trigger (
lambda p=p,d=d: (p.push(d), p.close_when_done())
)
if __name__ == '__main__':
import time
the_trigger = trigger()
def thread_function (output_file, i, n):
print 'entering thread_function'
while n:
......@@ -162,8 +235,12 @@ if __name__ == '__main__':
def found_terminator (self):
data, self.buffer = self.buffer, ''
if not data:
asyncore.close_all()
print "done"
return
n = string.atoi (string.split (data)[0])
tf = trigger_file (the_trigger, self)
tf = trigger_file (self)
self.count = self.count + 1
thread.start_new_thread (thread_function, (tf, self.count, n))
......@@ -182,4 +259,7 @@ if __name__ == '__main__':
thread_server()
#asyncore.loop(1.0, use_poll=1)
asyncore.loop ()
try:
asyncore.loop ()
except:
asyncore.close_all()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment