Commit 52b8c629 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge in zeo-1_0-branch

Fixed a bug in building undo invalidation info that caused client
storages to raise an exception that caused connections to be reset
and that, for some of reason, caused undo's to not propigate to
the clients.

Avoid long delays at the end of a pack.

    When a client calls pack, a separate thread is started to call the
    pack call on the storage.  When the thread finishes, it calls
    message_output() to send a response to the client.  If asyncore is
    currently in a poll call when this happens, the output won't be
    detected until the next poll call.  If there is little I/O on the
    connection, this won't happen until the select call times out
    after 30 seconds.

    The trigger is the standard gimmick for one thread to notify a
    mainloop in another thread that the first thread has some I/O to
    do.  It exits the current select.  The next poll call detects that
    the triggering thread is ready to do I/O and handles it.

Use asyncwrap to call asyncore.loop() and asyncore.poll()
parent 67e905c9
......@@ -83,7 +83,7 @@
#
##############################################################################
__version__ = "$Revision: 1.28 $"[11:-2]
__version__ = "$Revision: 1.29 $"[11:-2]
import asyncore, socket, string, sys, os
from smac import SizedMessageAsyncConnection
......@@ -93,10 +93,12 @@ from cPickle import Unpickler
from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction
import traceback
from zLOG import LOG, INFO, ERROR, TRACE
from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
from ZODB.referencesf import referencesf
from thread import start_new_thread
from cStringIO import StringIO
from ZEO import trigger
from ZEO import asyncwrap
class StorageServerError(POSException.StorageError): pass
......@@ -124,7 +126,6 @@ class StorageServer(asyncore.dispatcher):
self.__connections={}
self.__get_connections=self.__connections.get
asyncore.dispatcher.__init__(self)
if type(connection) is type(''):
......@@ -184,7 +185,7 @@ class StorageServer(asyncore.dispatcher):
sock, addr = self.accept()
except socket.error:
sys.stderr.write('warning: accept failed\n')
else:
ZEOConnection(self, sock, addr)
def log_info(self, message, type='info'):
......@@ -234,6 +235,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
self.__server=server
self.__invalidated=[]
self.__closed=None
self._pack_trigger = trigger.trigger()
if __debug__: debug='ZEO Server'
else: debug=0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
......@@ -385,15 +387,20 @@ class ZEOConnection(SizedMessageAsyncConnection):
def _pack(self, t, wait=0):
try:
LOG('ZEO Server', BLATHER, 'pack begin')
self.__storage.pack(t, referencesf)
LOG('ZEO Server', BLATHER, 'pack end')
except:
LOG('ZEO Server', ERROR,
'Pack failed for %s' % self.__storage_id,
error=sys.exc_info())
if wait: self.return_error(sys.exc_info()[0], sys.exc_info()[1])
if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
self._pack_trigger.pull_trigger()
else:
if wait:
self.message_output('RN.')
self._pack_trigger.pull_trigger()
else:
# Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (),
......@@ -469,7 +476,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
oids=self.__storage.undo(transaction_id)
if oids:
self.__server.invalidate(
self, self.__storage_id, map(lambda oid: (oid,None,''), oids)
self, self.__storage_id, map(lambda oid: (oid,None), oids)
)
return oids
return ()
......@@ -571,7 +578,10 @@ if __name__=='__main__':
import ZODB.FileStorage
name, port = sys.argv[1:3]
blather(name, port)
try: port='',string.atoi(port)
except: pass
try:
port='', int(port)
except:
pass
StorageServer(port, ZODB.FileStorage.FileStorage(name))
asyncore.loop()
asyncwrap.loop()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment