Commit e6893692 authored by Guido van Rossum's avatar Guido van Rossum

Simple-minded cleanup pass.

- Remove unused imports.

- Got rid of \ continuation lines.

- Remove leading underscores from all names of methods and instance
  variables of ZEOStorage.  There was no usage consistency and I don't
  see any other reason to maintain the existing names.
parent 374c7e96
......@@ -32,11 +32,10 @@ from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
import zLOG
from ZODB.POSException import StorageError, StorageTransactionError, \
TransactionError, ReadOnlyError
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.TmpStore import TmpStore
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
......@@ -121,62 +120,63 @@ class ZEOStorage:
def __init__(self, server):
self.server = server
self.client = None
self._conn = None # the connection associated with client
self.__storage = None
self.__storage_id = "uninitialized"
self._transaction = None
self.conn = None # the connection associated with client
self.storage = None
self.storage_id = "uninitialized"
self.transaction = None
def notifyConnected(self, conn):
self._conn = conn
self.conn = conn
self.client = ClientStub.ClientStorage(conn)
def notifyDisconnected(self):
# When this storage closes, we must ensure that it aborts
# any pending transaction. Not sure if this is the cleanest way.
if self._transaction is not None:
self._log("disconnected during transaction %s" % self._transaction)
self.tpc_abort(self._transaction.id)
if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction)
self.tpc_abort(self.transaction.id)
else:
self._log("disconnected")
self.log("disconnected")
def __repr__(self):
tid = self._transaction and repr(self._transaction.id)
if self.__storage:
stid = self.__storage._transaction and \
repr(self.__storage._transaction.id)
tid = self.transaction and repr(self.transaction.id)
if self.storage:
stid = (self.storage._transaction and
repr(self.storage._transaction.id))
else:
stid = None
name = self.__class__.__name__
return "<%s %X trans=%s s_trans=%s>" % (name, id(self), tid, stid)
def _log(self, msg, level=zLOG.INFO, error=None):
name = getattr(self.__storage, '__name__', None)
def log(self, msg, level=zLOG.INFO, error=None):
name = getattr(self.storage, '__name__', None)
if name is None:
name = str(self.__storage)
name = str(self.storage)
zLOG.LOG("%s:%s" % (_label, name), level, msg, error=error)
def setup_delegation(self):
"""Delegate several methods to the storage"""
self.versionEmpty = self.__storage.versionEmpty
self.versions = self.__storage.versions
self.history = self.__storage.history
self.load = self.__storage.load
self.loadSerial = self.__storage.loadSerial
self.modifiedInVersion = self.__storage.modifiedInVersion
def _check_tid(self, tid, exc=None):
self.versionEmpty = self.storage.versionEmpty
self.versions = self.storage.versions
self.history = self.storage.history
self.load = self.storage.load
self.loadSerial = self.storage.loadSerial
self.modifiedInVersion = self.storage.modifiedInVersion
def check_tid(self, tid, exc=None):
caller = sys._getframe().f_back.f_code.co_name
if self._transaction is None:
self._log("no current transaction: %s()" % caller, zLOG.PROBLEM)
if self.transaction is None:
self.log("no current transaction: %s()" % caller, zLOG.PROBLEM)
if exc is not None:
raise exc(None, tid)
else:
return 0
if self._transaction.id != tid:
self._log("%s(%s) invalid; current transaction = %s" % \
(caller, repr(tid), repr(self._transaction.id)), zLOG.PROBLEM)
if self.transaction.id != tid:
self.log("%s(%s) invalid; current transaction = %s" %
(caller, repr(tid), repr(self.transaction.id)),
zLOG.PROBLEM)
if exc is not None:
raise exc(self._transaction.id, tid)
raise exc(self.transaction.id, tid)
else:
return 0
return 1
......@@ -186,44 +186,44 @@ class ZEOStorage:
This method must be the first one called by the client.
"""
self._log("register(%r, %s)" % (storage_id, read_only))
self.log("register(%r, %s)" % (storage_id, read_only))
storage = self.server.storages.get(storage_id)
if storage is None:
self._log("unknown storage_id: %s" % storage_id)
self.log("unknown storage_id: %s" % storage_id)
raise ValueError, "unknown storage: %s" % storage_id
if not read_only and (self.server.read_only or storage.isReadOnly()):
raise ReadOnlyError()
self.__storage_id = storage_id
self.__storage = storage
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
self.server.register_connection(storage_id, self)
self._log("registered storage %r: %s" % (storage_id, storage))
self.log("registered storage %r: %s" % (storage_id, storage))
def get_info(self):
return {'length': len(self.__storage),
'size': self.__storage.getSize(),
'name': self.__storage.getName(),
'supportsUndo': self.__storage.supportsUndo(),
'supportsVersions': self.__storage.supportsVersions(),
return {'length': len(self.storage),
'size': self.storage.getSize(),
'name': self.storage.getName(),
'supportsUndo': self.storage.supportsUndo(),
'supportsVersions': self.storage.supportsVersions(),
'supportsTransactionalUndo':
self.__storage.supportsTransactionalUndo(),
self.storage.supportsTransactionalUndo(),
}
def get_size_info(self):
return {'length': len(self.__storage),
'size': self.__storage.getSize(),
return {'length': len(self.storage),
'size': self.storage.getSize(),
}
def zeoLoad(self, oid):
v = self.__storage.modifiedInVersion(oid)
v = self.storage.modifiedInVersion(oid)
if v:
pv, sv = self.__storage.load(oid, v)
pv, sv = self.storage.load(oid, v)
else:
pv = sv = None
try:
p, s = self.__storage.load(oid, '')
p, s = self.storage.load(oid, '')
except KeyError:
if sv:
# Created in version, no non-version data
......@@ -250,29 +250,29 @@ class ZEOStorage:
def pack(self, time, wait=None):
if wait is not None:
return run_in_thread(self._pack, time)
return run_in_thread(self.pack_impl, time)
else:
# If the client isn't waiting for a reply, start a thread
# and forget about it.
t = threading.Thread(target=self._pack, args=(time,))
t = threading.Thread(target=self.pack_impl, args=(time,))
t.start()
return None
def _pack(self, time):
self.__storage.pack(time, referencesf)
def pack_impl(self, time):
self.storage.pack(time, referencesf)
# Broadcast new size statistics
self.server.invalidate(0, self.__storage_id, (), self.get_size_info())
self.server.invalidate(0, self.storage_id, (), self.get_size_info())
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n <= 0:
n = 1
return [self.__storage.new_oid() for i in range(n)]
return [self.storage.new_oid() for i in range(n)]
def undo(self, transaction_id):
oids = self.__storage.undo(transaction_id)
oids = self.storage.undo(transaction_id)
if oids:
self.server.invalidate(self, self.__storage_id,
self.server.invalidate(self, self.storage_id,
map(lambda oid: (oid, ''), oids))
return oids
return ()
......@@ -280,26 +280,26 @@ class ZEOStorage:
# undoLog and undoInfo are potentially slow methods
def undoInfo(self, first, last, spec):
return run_in_thread(self.__storage.undoInfo, first, last, spec)
return run_in_thread(self.storage.undoInfo, first, last, spec)
def undoLog(self, first, last):
return run_in_thread(self.__storage.undoLog, first, last)
return run_in_thread(self.storage.undoLog, first, last)
def tpc_begin(self, id, user, description, ext, tid, status):
if self._transaction is not None:
if self._transaction.id == id:
self._log("duplicate tpc_begin(%s)" % repr(id))
if self.transaction is not None:
if self.transaction.id == id:
self.log("duplicate tpc_begin(%s)" % repr(id))
return
else:
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
# (This doesn't require a lock because we're using asyncore)
if self.__storage._transaction is None:
self.strategy = ImmediateCommitStrategy(self.__storage,
if self.storage._transaction is None:
self.strategy = ImmediateCommitStrategy(self.storage,
self.client)
else:
self.strategy = DelayedCommitStrategy(self.__storage,
self.strategy = DelayedCommitStrategy(self.storage,
self.wait)
t = Transaction()
......@@ -309,56 +309,56 @@ class ZEOStorage:
t._extension = ext
self.strategy.tpc_begin(t, tid, status)
self._transaction = t
self.transaction = t
def tpc_finish(self, id):
if not self._check_tid(id):
if not self.check_tid(id):
return
invalidated = self.strategy.tpc_finish()
if invalidated:
self.server.invalidate(self, self.__storage_id,
self.server.invalidate(self, self.storage_id,
invalidated, self.get_size_info())
self._transaction = None
self.transaction = None
self.strategy = None
self._handle_waiting()
self.handle_waiting()
def tpc_abort(self, id):
if not self._check_tid(id):
if not self.check_tid(id):
return
strategy = self.strategy
strategy.tpc_abort()
self._transaction = None
self.transaction = None
self.strategy = None
# When ZEOStorage.notifyDisconnected() calls self.tpc_abort(),
# it is possible that self.strategy is DelayedCommitStrategy.
# In that case, ZEOStorage.tpc_abort() should *not* call
# self._handle_waiting(), otherwise there could be two
# self.handle_waiting(), otherwise there could be two
# ZEOStorage instances whose strategy is
# ImmediateCommitStrategy!
if isinstance(strategy, ImmediateCommitStrategy):
self._handle_waiting()
self.handle_waiting()
# XXX else, should we remove ourselves from storage._waiting???
# XXX handle new serialnos
def storea(self, oid, serial, data, version, id):
self._check_tid(id, exc=StorageTransactionError)
self.check_tid(id, exc=StorageTransactionError)
self.strategy.store(oid, serial, data, version)
def vote(self, id):
self._check_tid(id, exc=StorageTransactionError)
self.check_tid(id, exc=StorageTransactionError)
return self.strategy.tpc_vote()
def abortVersion(self, src, id):
self._check_tid(id, exc=StorageTransactionError)
self.check_tid(id, exc=StorageTransactionError)
return self.strategy.abortVersion(src)
def commitVersion(self, src, dest, id):
self._check_tid(id, exc=StorageTransactionError)
self.check_tid(id, exc=StorageTransactionError)
return self.strategy.commitVersion(src, dest)
def transactionalUndo(self, trans_id, id):
self._check_tid(id, exc=StorageTransactionError)
self.check_tid(id, exc=StorageTransactionError)
return self.strategy.transactionalUndo(trans_id)
# When a delayed transaction is restarted, the dance is
......@@ -372,35 +372,35 @@ class ZEOStorage:
# client will be blocked until it finishes.
def wait(self):
if self.__storage._transaction:
if self.storage._transaction:
d = Delay()
self.__storage._waiting.append((d, self))
self._log("Transaction blocked waiting for storage. "
"Clients waiting: %d." % len(self.__storage._waiting))
self.storage._waiting.append((d, self))
self.log("Transaction blocked waiting for storage. "
"Clients waiting: %d." % len(self.storage._waiting))
return d
else:
self.restart()
return None
def _handle_waiting(self):
while self.__storage._waiting:
delay, zeo_storage = self.__storage._waiting.pop(0)
if self._restart(zeo_storage, delay):
if self.__storage._waiting:
n = len(self.__storage._waiting)
self._log("Blocked transaction restarted. "
def handle_waiting(self):
while self.storage._waiting:
delay, zeo_storage = self.storage._waiting.pop(0)
if self.restart_other(zeo_storage, delay):
if self.storage._waiting:
n = len(self.storage._waiting)
self.log("Blocked transaction restarted. "
"Clients waiting: %d" % n)
else:
self._log("Blocked transaction restarted.")
self.log("Blocked transaction restarted.")
return
def _restart(self, zeo_storage, delay):
def restart_other(self, zeo_storage, delay):
# Return True if the server restarted.
# call the restart() method on the appropriate server.
try:
zeo_storage.restart(delay)
except:
self._log("Unexpected error handling waiting transaction",
self.log("Unexpected error handling waiting transaction",
level=zLOG.WARNING, error=sys.exc_info())
zeo_storage._conn.close()
return 0
......@@ -410,7 +410,7 @@ class ZEOStorage:
def restart(self, delay=None):
old_strategy = self.strategy
assert isinstance(old_strategy, DelayedCommitStrategy)
self.strategy = ImmediateCommitStrategy(self.__storage,
self.strategy = ImmediateCommitStrategy(self.storage,
self.client)
resp = old_strategy.restart(self.strategy)
if delay is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment