Commit b3805a2f authored by Jim Fulton's avatar Jim Fulton

just getting started

parent f61b86d4
##############################################################################
#
# Copyright (c) 1996-1998, Digital Creations, Fredericksburg, VA, USA.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# o Redistributions of source code must retain the above copyright
# notice, this list of conditions, and the disclaimer that follows.
#
# o Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# o Neither the name of Digital Creations nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS AND CONTRIBUTORS *AS IS*
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL
# CREATIONS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
#
#
# If you have questions regarding this software, contact:
#
# Digital Creations, L.C.
# 910 Princess Ann Street
# Fredericksburge, Virginia 22401
#
# info@digicool.com
#
# (540) 371-6909
#
##############################################################################
"""Network ZODB storage client
"""
__version__='$Revision: 1.1 $'[11:-2]
import struct, time, os, socket, cPickle, string, Sync, zrpc
now=time.time
from struct import pack, unpack
from ZODB import POSException, BaseStorage
TupleType=type(())
class UnrecognizedResult(POSException.StorageError):
"""A server call returned an unrecognized result
"""
class ClientStorage(BaseStorage.BaseStorage):
def __init__(self, connection, async=0):
if async: self._call=zrpc.async(connection)
else: self._call=zrpc.sync(connection)
info=self._call('get_info')
self._len=info.get('length',0)
self._size=info.get('size',0)
self.__name__=info.get('name', str(connection))
self._supportsUndo=info.get('supportsUndo',0)
self._supportsVersions=info.get('supportsVersions',0)
BaseStorage.BaseStorage.__init__(self,
info.get('name', str(connection)),
)
def registerDB(self, db, limit):
def invalidate(code, args,
invalidate=db.invalidate,
limit=limit,
release=self._commit_lock_release,
):
if code == 'I':
for oid, serial, version in args:
invalidate(oid, version=version)
elif code == 'U':
release()
self._call.setOutOfBand(invalidate)
def __len__(self): return self._len
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try: return self._call('abortVersion', src, transaction.id)
finally: self._lock_release()
def close(self):
self._lock_acquire()
try: self._call.close()
finally: self._lock_release()
def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try: return self._call('commitVersion', src, dest, transaction.id)
finally: self._lock_release()
def getName(self): return self.__name__
def getSize(self): return self._size
def history(self, oid, version, length=1):
self._lock_acquire()
try: return self._call('history', oid, version, length)
finally: self._lock_release()
def load(self, oid, version, _stuff=None):
self._lock_acquire()
try: return self._call('load', oid, version)
finally: self._lock_release()
def modifiedInVersion(self, oid):
self._lock_acquire()
try: return self._call('modifiedInVersion', oid)
finally: self._lock_release()
def new_oid(self, last=None):
self._lock_acquire()
try: return self._call('new_oid')
finally: self._lock_release()
def pack(self, t, rf):
# Note that we ignore the rf argument. The server
# will provide it's own implementation.
self._lock_acquire()
try: return self._call('pack', t)
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try: return self._call('store', oid, serial,
data, version, transaction.id)
finally: self._lock_release()
def supportsUndo(self): return self._supportsUndo
def supportsVersions(self): return self._supportsVersions
def tpc_abort(self, transaction):
self._lock_acquire()
try:
if transaction is not self._transaction: return
self._call('tpc_abort', id)
self._transaction=None
self._commit_lock_release()
finally: self._lock_release()
def tpc_begin(self, transaction):
self._lock_acquire()
try:
if self._transaction is transaction: return
while 1:
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
if self._call('tpc_begin', id, user, desc, ext) is None:
break
self._transaction=transaction
self._clear_temp()
user=transaction.user
desc=transaction.description
ext=transaction._extension
if ext: ext=dumps(ext,1)
else: ext=""
self._ude=user, desc, ext
t=time.time()
t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._ts=t=t.laterThan(self._ts)
self._serial=`t`
self._begin(self._serial, user, desc, ext)
finally: self._lock_release()
def tpc_finish(self, transaction, f=None):
self._lock_acquire()
try:
if transaction is not self._transaction: return
if f is not None: f()
u,d,e=self._ude
self._finish(self._serial, u, d, e)
self._clear_temp()
self._ude=None
self._transaction=None
self._commit_lock_release()
finally: self._lock_release()
def _finish(self, tid, u, d, e):
pass
def _finish(self, id, user, desc, ext):
return self._call('tpc_finish', id, user, desc, ext)
def undo(self, transaction_id):
return self._call('undo', transaction_id)
finally: self._lock_release()
def undoLog(self, version, first, last, filter=None):
# Waaaa, we really need to get the filter through
# but how can we send it over the wire?
# I suppose we could try to run the filter in a restricted execution
# env.
# Maybe .... we are really going to want to pass lambdas, hm.
self._lock_acquire()
try: return self._call('undoLog', version, first, last) # Eek!
finally: self._lock_release()
def versionEmpty(self, version):
self._lock_acquire()
try: return self._call('versionEmpty', version)
finally: self._lock_release()
def versions(self, max=None):
self._lock_acquire()
try: return self._call('versionEmpty', max)
finally: self._lock_release()
import asyncore, socket, string, sys, cPickle
from smac import smac
from ZODB import POSException
from ZODB.Transaction import Transaction
import traceback
class StorageServerError(POSException.ServerError): pass
class Server(asyncore.dispatcher):
def __init__(self, connection, storages):
self.host, self.port = connection
self.__storages=storages
self.__connections={}
self.__get_connections=self.__connections.get
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind((self.host, self.port))
self.listen(5)
def register_connection(self, connection, storage_id):
storage=self.__storages.get(storage_id, None)
if storage is None:
connection.close()
return None, None
connections=self.__get_connections(storage_id, None)
if connections is None:
self.__connections[storage_id]=connections=[]
connections.append(connection)
def invalidate(self, connection, storage_id, invalidated,
dumps=cPickle.dumps):
for c in self.__connections[storage_id]:
if c is connection: continue
c.message_output('I'+dumps(invalidated))
def writable(self): return 0
def handle_read(self): pass
def readable(self): return 1
def handle_connect (self): pass
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
sys.stderr.write('warning: accept failed\n')
Connection(self, sock, addr)
storage_methods={}
for n in ('get_info', 'abortVersion', 'commitVersion', 'history',
'load', 'modifiedInVersion', 'new_oid', 'pack', 'store',
'tpc_abort', 'tpc_begin', 'tpc_finish', 'undo', 'undoLog',
'versionEmpty'):
storage_methods[n]=1
storage_method=storage_methods.has_key
class Connection(smac):
_transaction=None
__storage=__storage_id=None
def __init__(self, server, sock, addr):
smac.__init__(self, sock, addr)
self.__server=server
self.__storage=server.storage
self.__invalidated=[]
def close(self):
self.__server.unregister_connection(self, self.__storage_id)
smac.close(self)
def message_input(self, message):
if self.__storage is None:
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
return
rt='R'
try:
args=cPickle.loads(message)
name, args = args[0], args[1:]
if not storage_method(name):
raise 'Invalid Method Name', name
if hasattr(self, name):
r=apply(getattr(self, name), args)
else:
r=apply(getattr(self.__storage, name), args)
except:
traceback.print_exc()
t, r = sys.exc_info()[:2]
if type(r) is not type(self): r=t,r
rt='E'
r=cPickle.dumps(r,1)
self.message_output(rt+r)
def get_info(self):
storage=self.__storage
return {
'length': len(storage),
'size': storage.getSize(),
'name': storage.getName(),
'supportsUndo': storage.supportsUndo(),
'supportsVersions': storage.supportsVersions(),
}
def store(self, oid, serial, data, version, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, data, serial, version, t)
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append(oid, serial, version)
return newserial
def unlock(self):
self.message_output('UN')
def tpc_abort(self, id):
t=self._transaction
if t is None or id != t.id: return
r=self.__storage.tpc_abort(t)
for c in self.__storage.__waiting: c.unlock()
self._transaction=None
self.__invalidated=[]
def tpc_begin(self, id, user, description, ext):
t=self._transaction
if t is not None and id == t.id: return
storage=self.__storage
if storage._transaction is not None:
storage.__waiting.append(self)
return 1
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
storage.tpc_begin(t)
storage.__waiting=[]
self.__invalidated=[]
def tpc_finish(self, id, user, description, ext):
t=self._transaction
if id != t.id: return
t.user=user
t.description=description
r=self.__storage.tpc_finish(t)
for c in self.__storage.__waiting: c.unlock()
self._transaction=None
self.__server.invalidate(self, self.__storage_id, self.__invalidated)
self.__invalidated=[]
if __name__=='__main__':
import ZODB.FileStorage
name, port = sys.argv[1:3]
try: port='',string.atoi(port)
except: pass
Server(port, ZODB.FileStorage.FileStorage(name))
asyncore.loop()
"""Sized message async connections
"""
import asyncore, string, struct
class smac(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.addr=addr
self.__state=None
self.__inp=None
self.__l=4
self.__output=output=[]
self.__append=output.append
self.__pop=output.pop
def handle_read(self,
join=string.join, StringType=type('')):
l=self.__l
d=self.recv(l)
inp=self.__inp
if inp is None:
inp=d
elif type(inp) is StringType:
inp=[inp,d]
else:
inp.append(d)
l=l-len(d)
if l <= 0:
if type(inp) is not StringType: inp=join(inp,'')
if self.__state is None:
# waiting for message
self.__l=struct.unpack(">i",inp)[0]
self.__state=1
self.__inp=None
else:
self.__inp=None
self.__l=4
self.__state=None
self.message_input(inp)
else:
self.__l=l
self.__inp=inp
def readable(self): return 1
def writable(self): return not not self.__output
def handle_write(self):
output=self.__output
if output:
v=output[0]
n=self.send(v)
if n < len(v):
output[0]=v[n:]
else:
del output[0]
def handle_close(self): self.close()
def message_output(self, message,
pack=struct.pack, len=len):
self.__append(pack(">i",len(message))+message)
"""But simple rpc mechanisms
"""
from cPickle import dumps, loads
from ThreadLock import allocate_lock
import socket, smac, string, struct
TupleType=type(())
class sync:
"""Synchronous rpc"""
_outOfBand=None
def __init__(self, connection, outOfBand=None):
host, port = connection
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
self.__s=s
self._outOfBand=outOfBand
def setOutOfBand(self, f): self._outOfBand=f
def __call__(self, *args):
args=dumps(args,1)
self._write(args)
while 1:
r=self._read()
c=r[:1]
if c=='R':
return loads(r[1:])
if c=='E':
r=loads(r[1:])
if type(r) is TupleType: raise r[0], r[1]
raise r
oob=self._outOfBand
if oob is not None:
oob(c, loads(r[1:]))
else:
raise UnrecognizedResult, r
def _write(self, data, pack=struct.pack):
send=self.__s.send
h=pack(">i", len(data))
l=len(h)
while l > 0:
sent=send(h)
h=h[sent:]
l=l-sent
l=len(data)
while l > 0:
sent=send(data)
data=data[sent:]
l=l-sent
def _read(self, _st=type(''), join=string.join, unpack=struct.unpack):
recv=self.__s.recv
l=4
data=None
while l > 0:
d=recv(l)
if data is None: data=d
elif type(data) is _st: data=[data, d]
else: data.append(d)
l=l-len(d)
if type(data) is not _st: data=join(data,'')
l,=unpack(">i", data)
data=None
while l > 0:
d=recv(l)
if data is None: data=d
elif type(data) is st: data=[data, d]
else: data.append(d)
l=l-len(d)
if type(data) is not _st: data=join(data,'')
return data
class async(smac.smac, sync):
def __init__(self, connection, outOfBand=None):
host, port = connection
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
self._outOfBand=outOfBand
l=allocate_lock()
self.__la=l.acquire
self.__lr=l.release
self.__r=None
l.acquire()
smac.__init__(self, s, None)
def _write(self, data): self.message_output(data)
def message_input(self, m):
c=m[:1]
if c in 'RE':
self.__r=m
self.__lr()
else:
oob=self._outOfBand
if oob is not None: oob(c, loads(m[1:]))
def _read(self):
self.__la()
return self.__r
"""Sized message async connections
"""
import asyncore, string, struct
class smac(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.addr=addr
self.__state=None
self.__inp=None
self.__l=4
self.__output=output=[]
self.__append=output.append
self.__pop=output.pop
def handle_read(self,
join=string.join, StringType=type('')):
l=self.__l
d=self.recv(l)
inp=self.__inp
if inp is None:
inp=d
elif type(inp) is StringType:
inp=[inp,d]
else:
inp.append(d)
l=l-len(d)
if l <= 0:
if type(inp) is not StringType: inp=join(inp,'')
if self.__state is None:
# waiting for message
self.__l=struct.unpack(">i",inp)[0]
self.__state=1
self.__inp=None
else:
self.__inp=None
self.__l=4
self.__state=None
self.message_input(inp)
else:
self.__l=l
self.__inp=inp
def readable(self): return 1
def writable(self): return not not self.__output
def handle_write(self):
output=self.__output
if output:
v=output[0]
n=self.send(v)
if n < len(v):
output[0]=v[n:]
else:
del output[0]
def handle_close(self): self.close()
def message_output(self, message,
pack=struct.pack, len=len):
self.__append(pack(">i",len(message))+message)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment