Commit 695f2ef4 authored by Guido van Rossum's avatar Guido van Rossum

Another bunch of small cleanups and fixes.

Open the cache later to avoid scanning it twice if a connection is
made right away.

ClientStorage.close() is now idempotent.

ClientStorage stores its addr argument as self._addr so the test suite
doesn't have to dig it out of the rpc manager to reopen the storage as
readonly.

Renamed some of the callbacks into the client for clarity:
begin -> beginVerify
end -> endVerify
invalidate -> invalidateVerify
Invalidate -> invalidateTrans
parent 365c17b6
......@@ -13,7 +13,7 @@
##############################################################################
"""Network ZODB storage client
$Id: ClientStorage.py,v 1.51 2002/08/16 22:55:44 jeremy Exp $
$Id: ClientStorage.py,v 1.52 2002/08/28 16:37:09 gvanrossum Exp $
"""
import cPickle
......@@ -70,12 +70,14 @@ class ClientStorage:
min_disconnect_poll=5, max_disconnect_poll=300,
wait=0, read_only=0):
self._addr = addr # For tests
self._server = disconnected_stub
self._is_read_only = read_only
self._storage = storage
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0}
'supportsUndo':0, 'supportsVersions': 0,
'supportsTransactionalUndo': 0}
self._tbuf = TransactionBuffer()
self._db = None
......@@ -92,7 +94,6 @@ class ClientStorage:
client = client or os.environ.get('ZEO_CLIENT')
self._cache = ClientCache.ClientCache(storage, cache_size,
client=client, var=var)
self._cache.open() # XXX open now? or later?
self._rpc_mgr = ConnectionManager(addr, self,
tmin=min_disconnect_poll,
......@@ -107,26 +108,30 @@ class ClientStorage:
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
# If we're connected at this point, the cache is opened as a
# side effect of verify_cache(). If not, open it now.
if not self.is_connected():
self._cache.open()
def _basic_init(self, name):
"""Handle initialization activites of BaseStorage"""
# XXX does anything depend on attr being __name__
self.__name__ = name
self.__name__ = name # A standard convention among storages
# A ClientStorage only allows one client to commit at a time.
# Mutual exclusion is achieved using tpc_cond, which
# A ClientStorage only allows one thread to commit at a time.
# Mutual exclusion is achieved using _tpc_cond, which
# protects _transaction. A thread that wants to assign to
# self._transaction must acquire tpc_cond first. A thread
# self._transaction must acquire _tpc_cond first. A thread
# that decides it's done with a transaction (whether via success
# or failure) must set _transaction to None and do
# tpc_cond.notify() before releasing tpc_cond..
self.tpc_cond = threading.Condition()
# _tpc_cond.notify() before releasing _tpc_cond.
self._tpc_cond = threading.Condition()
self._transaction = None
# Prevent multiple new_oid calls from going out. The _oids
# variable should only be modified while holding the
# oid_cond.
self.oid_cond = threading.Condition()
# _oid_lock.
self._oid_lock = threading.Lock()
commit_lock = threading.Lock()
self._commit_lock_acquire = commit_lock.acquire
......@@ -139,12 +144,18 @@ class ClientStorage:
def close(self):
if self._tbuf is not None:
self._tbuf.close()
self._tbuf = None
if self._cache is not None:
self._cache.close()
self._rpc_mgr.close()
self._cache = None
if self._rpc_mgr is not None:
self._rpc_mgr.close()
self._rpc_mgr = None
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB."""
# This is called by ZODB.DB (and by some tests).
# The storage isn't really ready to use until after this call.
log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
self._db = db
......@@ -154,26 +165,27 @@ class ClientStorage:
else:
return 1
def notifyConnected(self, c):
log2(INFO, "Connected to storage via %s" % repr(c))
def notifyConnected(self, conn):
log2(INFO, "Connected to storage via %s" % repr(conn))
# check the protocol version here?
stub = ServerStub.StorageServer(c)
stub = ServerStub.StorageServer(conn)
self._oids = []
# XXX Why is this synchronous? If it were async, verification
# would start faster.
stub.register(str(self._storage), self._is_read_only)
self._info.update(stub.get_info())
self.verify_cache(stub)
# Don't make the server available to clients until after
# validating the cache
# XXX The stub should be saved here and set in end() below.
self._server = stub
def verify_cache(self, server):
# XXX beginZeoVerify ends up calling back to beginVerify() below.
# That whole exchange is rather unnecessary.
server.beginZeoVerify()
self._cache.verify(server.zeoVerify)
server.endZeoVerify()
......@@ -206,37 +218,27 @@ class ClientStorage:
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try:
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
return self._info['supportsTransactionalUndo']
def isReadOnly(self):
return self._is_read_only
def _check_trans(self, trans, exc=None):
def _check_trans(self, trans):
if self._is_read_only:
raise POSException.ReadOnlyError()
if self._transaction is not trans:
if exc is None:
return 0
else:
raise exc(self._transaction, trans)
return 1
raise POSException.StorageTransactionError(self._transaction,
trans)
def abortVersion(self, src, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
self._check_trans(transaction)
oids = self._server.abortVersion(src, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, src)
return oids
def commitVersion(self, src, dest, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
self._check_trans(transaction)
oids = self._server.commitVersion(src, dest, self._serial)
if dest:
# just invalidate our version data
......@@ -280,13 +282,12 @@ class ClientStorage:
if self._is_read_only:
raise POSException.ReadOnlyError()
# avoid multiple oid requests to server at the same time
self.oid_cond.acquire()
self._oid_lock.acquire()
if not self._oids:
self._oids = self._server.new_oids()
self._oids.reverse()
self.oid_cond.notifyAll()
oid = self._oids.pop()
self.oid_cond.release()
self._oid_lock.release()
return oid
def pack(self, t=None, rf=None, wait=0, days=0):
......@@ -309,9 +310,7 @@ class ClientStorage:
return r
def store(self, oid, serial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction, POSException.StorageTransactionError)
self._check_trans(transaction)
self._server.storea(oid, serial, data, version, self._serial)
self._tbuf.store(oid, version, data)
return self._check_serials()
......@@ -323,17 +322,17 @@ class ClientStorage:
return self._check_serials()
def tpc_begin(self, transaction, tid=None, status=' '):
self.tpc_cond.acquire()
self._tpc_cond.acquire()
while self._transaction is not None:
# It is allowable for a client to call two tpc_begins in a
# row with the same transaction, and the second of these
# must be ignored.
if self._transaction == transaction:
self.tpc_cond.release()
self._tpc_cond.release()
return
self.tpc_cond.wait()
self._tpc_cond.wait()
self._transaction = transaction
self.tpc_cond.release()
self._tpc_cond.release()
if tid is None:
self._ts = get_timestamp(self._ts)
......@@ -360,11 +359,11 @@ class ClientStorage:
def end_transaction(self):
# the right way to set self._transaction to None
# calls notify() on tpc_cond in case there are waiting threads
self.tpc_cond.acquire()
# calls notify() on _tpc_cond in case there are waiting threads
self._tpc_cond.acquire()
self._transaction = None
self.tpc_cond.notify()
self.tpc_cond.release()
self._tpc_cond.notify()
self._tpc_cond.release()
def tpc_abort(self, transaction):
if transaction is not self._transaction:
......@@ -424,9 +423,7 @@ class ClientStorage:
self._tbuf.clear()
def transactionalUndo(self, trans_id, trans):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(trans, POSException.StorageTransactionError)
self._check_trans(trans)
oids = self._server.transactionalUndo(trans_id, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, '')
......@@ -464,18 +461,20 @@ class ClientStorage:
def info(self, dict):
self._info.update(dict)
def begin(self):
def beginVerify(self):
self._tfile = tempfile.TemporaryFile(suffix=".inv")
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
def invalidate(self, args):
# Queue an invalidate for the end the transaction
def invalidateVerify(self, args):
# Invalidation as result of verify_cache().
# Queue an invalidate for the end the verification procedure.
if self._pickler is None:
# XXX This should never happen
return
self._pickler.dump(args)
def end(self):
def endVerify(self):
if self._pickler is None:
return
self._pickler.dump((0,0))
......@@ -492,7 +491,8 @@ class ClientStorage:
self._db.invalidate(oid, version=version)
f.close()
def Invalidate(self, args):
def invalidateTrans(self, args):
# Invalidation as a result of a transaction.
for oid, version in args:
self._cache.invalidate(oid, version=version)
try:
......
......@@ -18,19 +18,16 @@ class ClientStorage:
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('begin')
self.rpc.callAsync('beginVerify')
# XXX must rename the two invalidate messages. I can never
# remember which is which
def invalidate(self, args):
self.rpc.callAsync('invalidate', args)
def Invalidate(self, args):
self.rpc.callAsync('Invalidate', args)
def invalidateVerify(self, args):
self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
self.rpc.callAsync('end')
self.rpc.callAsync('endVerify')
def invalidateTrans(self, args):
self.rpc.callAsync('invalidateTrans', args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
......
......@@ -86,11 +86,11 @@ class StorageServer:
l = self.connections[storage_id] = []
l.append(proxy)
def invalidate(self, conn, storage_id, invalidated=(), info=0):
def invalidate(self, conn, storage_id, invalidated=(), info=None):
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.Invalidate(invalidated)
else:
p.client.invalidateTrans(invalidated)
elif info is not None:
p.client.info(info)
def close_server(self):
......@@ -108,11 +108,9 @@ class StorageServer:
pass
def close(self, conn):
removed = 0
for sid, cl in self.connections.items():
if conn.obj in cl:
cl.remove(conn.obj)
removed = 1
class ZEOStorage:
"""Proxy to underlying storage for a single remote client."""
......@@ -130,7 +128,7 @@ class ZEOStorage:
# any pending transaction. Not sure if this is the clearest way.
if self._transaction is not None:
self.__storage.tpc_abort(self._transaction)
self._transaction is None
self._transaction = None
self._conn.close()
def notifyConnected(self, conn):
......@@ -240,9 +238,9 @@ class ZEOStorage:
except: # except what?
return None
if os != s:
self.client.invalidate((oid, ''))
self.client.invalidateVerify((oid, ''))
elif osv != sv:
self.client.invalidate((oid, v))
self.client.invalidateVerify((oid, v))
def endZeoVerify(self):
self.client.endVerify()
......@@ -257,6 +255,8 @@ class ZEOStorage:
t.start()
if wait is not None:
return wait
else:
return None
def _pack(self, t, delay):
try:
......@@ -277,7 +277,8 @@ class ZEOStorage:
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n < 0:
if n <= 0:
# Always return at least one
n = 1
return [self.__storage.new_oid() for i in range(n)]
......@@ -374,6 +375,7 @@ class ZEOStorage:
return d
else:
self.restart()
return None
def _handle_waiting(self):
while self.__storage._waiting:
......@@ -495,7 +497,7 @@ class ImmediateCommitStrategy:
self.invalidated.append((oid, version))
try:
nil = dump(newserial, 1)
dump(newserial, 1)
except:
msg = "Couldn't pickle storage exception: %s" % repr(newserial)
slog(self.storage, msg, zLOG.ERROR)
......
......@@ -119,10 +119,10 @@ class GenericTests(StorageTestBase.StorageTestBase,
# XXX Needed to support ReadOnlyStorage tests. Ought to be a
# cleaner way.
# Is this the only way to get the address?
addr = self._storage._rpc_mgr.addr[0][1]
addr = self._storage._addr
self._storage.close()
self._storage = ZEO.ClientStorage.ClientStorage(addr, read_only=1,
self._storage = ZEO.ClientStorage.ClientStorage(addr,
read_only=read_only,
wait=1)
def checkLargeUpdate(self):
......@@ -130,7 +130,7 @@ class GenericTests(StorageTestBase.StorageTestBase,
self._dostore(data=obj)
def checkZEOInvalidation(self):
addr = self._storage._rpc_mgr.addr[0][1]
addr = self._storage._addr
storage2 = ZEO.ClientStorage.ClientStorage(addr, wait=1,
min_disconnect_poll=0.1)
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment