Commit 6f9966c8 authored by Jim Fulton's avatar Jim Fulton

Fixed undo support. (Note that this requires a ZODB change. See undoInfo.

Renamed some classes to make their intent clearer.

Fixed a silly typo in zrpc.
parent f96f0a43
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
###################################################################### ######################################################################
"""Network ZODB storage client """Network ZODB storage client
""" """
__version__='$Revision: 1.5 $'[11:-2] __version__='$Revision: 1.6 $'[11:-2]
import struct, time, os, socket, cPickle, string, Sync, zrpc, ClientCache import struct, time, os, socket, cPickle, string, Sync, zrpc, ClientCache
import tempfile import tempfile
...@@ -75,7 +75,7 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -75,7 +75,7 @@ class ClientStorage(BaseStorage.BaseStorage):
loop(timeout, use_poll) loop(timeout, use_poll)
asyncore.loop=loop asyncore.loop=loop
self._call=zrpc.sync(connection) self._call=zrpc.syncRPC(connection)
self.__begin='tpc_begin_sync' self.__begin='tpc_begin_sync'
self._call._write(str(storage)) self._call._write(str(storage))
...@@ -101,7 +101,7 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -101,7 +101,7 @@ class ClientStorage(BaseStorage.BaseStorage):
BaseStorage.BaseStorage.__init__(self, name) BaseStorage.BaseStorage.__init__(self, name)
def becomeAsync(self): def becomeAsync(self):
self._call=zrpc.async(self._call) self._call=zrpc.asyncRPC(self._call)
self.__begin='tpc_begin' self.__begin='tpc_begin'
def registerDB(self, db, limit): def registerDB(self, db, limit):
...@@ -276,20 +276,25 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -276,20 +276,25 @@ class ClientStorage(BaseStorage.BaseStorage):
def undo(self, transaction_id): def undo(self, transaction_id):
self._lock_acquire() self._lock_acquire()
try: return self._call('undo', transaction_id) try:
oids=self._call('undo', transaction_id)
cinvalidate=self._cache.invalidate
for oid in oids: cinvalidate(oid,'')
return oids
finally: self._lock_release() finally: self._lock_release()
def undoLog(self, version, first, last, filter=None):
# Waaaa, we really need to get the filter through
# but how can we send it over the wire?
# I suppose we could try to run the filter in a restricted execution def undoInfo(self, first, last, specification):
# env. self._lock_acquire()
try:
return self._call('undoInfo', first, last, specification)
finally: self._lock_release()
# Maybe .... we are really going to want to pass lambdas, hm. def undoLog(self, first, last, filter=None):
if filter is not None: return ()
self._lock_acquire() self._lock_acquire()
try: return self._call('undoLog', version, first, last) # Eek! try: return self._call('undoLog', first, last) # Eek!
finally: self._lock_release() finally: self._lock_release()
def versionEmpty(self, version): def versionEmpty(self, version):
......
...@@ -41,10 +41,10 @@ ...@@ -41,10 +41,10 @@
# SUCH DAMAGE. # SUCH DAMAGE.
###################################################################### ######################################################################
__version__ = "$Revision: 1.5 $"[11:-2] __version__ = "$Revision: 1.6 $"[11:-2]
import asyncore, socket, string, sys, cPickle import asyncore, socket, string, sys, cPickle
from smac import smac from smac import SizedMessageAsyncConnection
from ZODB import POSException from ZODB import POSException
from ZODB.Transaction import Transaction from ZODB.Transaction import Transaction
import traceback import traceback
...@@ -129,20 +129,20 @@ storage_methods={} ...@@ -129,20 +129,20 @@ storage_methods={}
for n in ('get_info', 'abortVersion', 'commitVersion', 'history', for n in ('get_info', 'abortVersion', 'commitVersion', 'history',
'load', 'modifiedInVersion', 'new_oid', 'pack', 'store', 'load', 'modifiedInVersion', 'new_oid', 'pack', 'store',
'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo', 'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo',
'undoLog', 'versionEmpty', 'undoLog', 'undoInfo', 'versionEmpty',
'zeoLoad', 'zeoVerify', 'zeoLoad', 'zeoVerify',
): ):
storage_methods[n]=1 storage_methods[n]=1
storage_method=storage_methods.has_key storage_method=storage_methods.has_key
_noreturn=[] _noreturn=[]
class Connection(smac): class Connection(SizedMessageAsyncConnection):
_transaction=None _transaction=None
__storage=__storage_id=None __storage=__storage_id=None
def __init__(self, server, sock, addr): def __init__(self, server, sock, addr):
smac.__init__(self, sock, addr) SizedMessageAsyncConnection.__init__(self, sock, addr)
self.__server=server self.__server=server
self.__invalidated=[] self.__invalidated=[]
self.__closed=None self.__closed=None
...@@ -155,7 +155,7 @@ class Connection(smac): ...@@ -155,7 +155,7 @@ class Connection(smac):
self.__server.unregister_connection(self, self.__storage_id) self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1 self.__closed=1
smac.close(self) SizedMessageAsyncConnection.close(self)
def message_input(self, message): def message_input(self, message):
if __debug__: if __debug__:
...@@ -238,6 +238,12 @@ class Connection(smac): ...@@ -238,6 +238,12 @@ class Connection(smac):
self.__invalidated.append(oid, serial, version) self.__invalidated.append(oid, serial, version)
return newserial return newserial
def undo(self, transaction_id):
oids=self.__storage.undo(transaction_id)
self.__server.invalidate(
self, self.__storage_id, map(lambda oid: (oid,None,''), oids))
return oids
def tpc_abort(self, id): def tpc_abort(self, id):
t=self._transaction t=self._transaction
if t is None or id != t.id: return if t is None or id != t.id: return
......
...@@ -43,12 +43,12 @@ ...@@ -43,12 +43,12 @@
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.5 $"[11:-2] __version__ = "$Revision: 1.6 $"[11:-2]
import asyncore, string, struct, zLOG import asyncore, string, struct, zLOG
from zLOG import LOG, INFO, ERROR from zLOG import LOG, INFO, ERROR
class smac(asyncore.dispatcher): class SizedMessageAsyncConnection(asyncore.dispatcher):
def __init__(self, sock, addr): def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock) asyncore.dispatcher.__init__(self, sock)
...@@ -102,7 +102,8 @@ class smac(asyncore.dispatcher): ...@@ -102,7 +102,8 @@ class smac(asyncore.dispatcher):
else: else:
del output[0] del output[0]
def handle_close(self): self.close() def handle_close(self):
self.close()
def message_output(self, message, def message_output(self, message,
pack=struct.pack, len=len): pack=struct.pack, len=len):
...@@ -115,6 +116,6 @@ class smac(asyncore.dispatcher): ...@@ -115,6 +116,6 @@ class smac(asyncore.dispatcher):
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if type=='error': type=ERROR if type=='error': type=ERROR
else: type=INFO else: type=INFO
LOG('ZEO Server', type, message) LOG('ZEO', type, message)
log=log_info log=log_info
...@@ -43,18 +43,18 @@ ...@@ -43,18 +43,18 @@
"""Simple rpc mechanisms """Simple rpc mechanisms
""" """
__version__ = "$Revision: 1.4 $"[11:-2] __version__ = "$Revision: 1.5 $"[11:-2]
from cPickle import dumps, loads from cPickle import dumps, loads
from thread import allocate_lock from thread import allocate_lock
from smac import smac from smac import SizedMessageAsyncConnection
import socket, string, struct import socket, string, struct
TupleType=type(()) TupleType=type(())
Wakeup=None Wakeup=None
class sync: class syncRPC:
"""Synchronous rpc""" """Synchronous rpc"""
_outOfBand=None _outOfBand=None
...@@ -67,7 +67,18 @@ class sync: ...@@ -67,7 +67,18 @@ class sync:
self._sync__q=[] self._sync__q=[]
self._outOfBand=outOfBand self._outOfBand=outOfBand
def setOutOfBand(self, f): self._outOfBand=f def setOutOfBand(self, f):
"""Define a call-back function for handling out-of-band communication
Normal communications from the server consists of call returns
and exception returns. The server may also send asynchronous
messages to the client. For the client to recieve these
messages, it must register an out-of-band callback
function. The function will be called with a single-character
message code and a message argument.
"""
self._outOfBand=f
def close(self): self._sync__s.close() def close(self): self._sync__s.close()
...@@ -129,7 +140,7 @@ class sync: ...@@ -129,7 +140,7 @@ class sync:
while l > 0: while l > 0:
d=recv(l) d=recv(l)
if data is None: data=d if data is None: data=d
elif type(data) is st: data=[data, d] elif type(data) is _st: data=[data, d]
else: data.append(d) else: data.append(d)
l=l-len(d) l=l-len(d)
if type(data) is not _st: data=join(data,'') if type(data) is not _st: data=join(data,'')
...@@ -137,14 +148,14 @@ class sync: ...@@ -137,14 +148,14 @@ class sync:
return data return data
class async(smac, sync): class asyncRPC(SizedMessageAsyncConnection, syncRPC):
def __init__(self, connection, outOfBand=None): def __init__(self, connection, outOfBand=None):
try: try:
host, port = connection host, port = connection
except: except:
s=connection._sync__s s=connection._sync__s
smac.__init__(self, s, None) SizedMessageAsyncConnection.__init__(self, s, None)
self._outOfBand=connection._outOfBand self._outOfBand=connection._outOfBand
for m in connection._sync__q: for m in connection._sync__q:
self.message_output(m) self.message_output(m)
...@@ -152,7 +163,7 @@ class async(smac, sync): ...@@ -152,7 +163,7 @@ class async(smac, sync):
else: else:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port) s.connect(host, port)
smac.__init__(self, s, None) SizedMessageAsyncConnection.__init__(self, s, None)
self._outOfBand=outOfBand self._outOfBand=outOfBand
l=allocate_lock() l=allocate_lock()
......
...@@ -43,12 +43,12 @@ ...@@ -43,12 +43,12 @@
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.5 $"[11:-2] __version__ = "$Revision: 1.6 $"[11:-2]
import asyncore, string, struct, zLOG import asyncore, string, struct, zLOG
from zLOG import LOG, INFO, ERROR from zLOG import LOG, INFO, ERROR
class smac(asyncore.dispatcher): class SizedMessageAsyncConnection(asyncore.dispatcher):
def __init__(self, sock, addr): def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock) asyncore.dispatcher.__init__(self, sock)
...@@ -102,7 +102,8 @@ class smac(asyncore.dispatcher): ...@@ -102,7 +102,8 @@ class smac(asyncore.dispatcher):
else: else:
del output[0] del output[0]
def handle_close(self): self.close() def handle_close(self):
self.close()
def message_output(self, message, def message_output(self, message,
pack=struct.pack, len=len): pack=struct.pack, len=len):
...@@ -115,6 +116,6 @@ class smac(asyncore.dispatcher): ...@@ -115,6 +116,6 @@ class smac(asyncore.dispatcher):
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if type=='error': type=ERROR if type=='error': type=ERROR
else: type=INFO else: type=INFO
LOG('ZEO Server', type, message) LOG('ZEO', type, message)
log=log_info log=log_info
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment