Commit 24afe7ac authored by Guido van Rossum's avatar Guido van Rossum

I set out making wait=1 work for fallback connections, i.e. the

ClientStorage constructor called with both wait=1 and
read_only_fallback=1 should return, indicating its readiness, when a
read-only connection was made.  This is done by calling
connect(sync=1).  Previously this waited for the ConnectThread to
finish, but that thread doesn't finish until it's made a read-write
connection, so a different mechanism is needed.

I ended up doing a major overhaul of the interfaces between
ClientStorage, ConnectionManager, ConnectThread/ConnectWrapper, and
even ManagedConnection.  Changes:

ClientStorage.py:

  ClientStorage:

  - testConnection() now returns just the preferred flag; stubs are
    cheap and I like to have the notifyConnected() signature be the
    same for clients and servers.

  - notifyConnected() now takes a connection (to match the signature
    of this method in StorageServer), and creates a new stub.  It also
    takes care of the reconnect business if the client was already
    connected, rather than the ClientManager.  It stores the
    connection as self._connection so it can close the previous one.
    This is also reset by notifyDisconnected().

zrpc/client.py:

  ConnectionManager:

  - Changed self.thread_lock into a condition variable.  It now also
    protects self.connection.  The condition is notified when
    self.connection is set to a non-None value in connect_done();
    connect(sync=1) waits for it.  The self.connected variable is no
    more; we test "self.connection is not None" instead.

  - Tried to made close() reentrant.  (There's a trick: you can't set
    self.connection to None, conn.close() ends up calling close_conn()
    which does this.)

  - Renamed notify_closed() to close_conn(), for symmetry with the
    StorageServer API.

  - Added an is_connected() method so ConnectThread.try_connect()
    doesn't have to dig inside the manager's guts to find out if the
    manager is connected (important for the disposition of fallback
    wrappers).

  ConnectThread and ConnectWrapper:

  - Follow above changes in the ClientStorage and ConnectionManager
    APIs: don't close the manager's connection when reconnecting, but
    leave that up to notifyConnected(); ConnectWrapper no longer
    manages the stub.

  - ConnectWrapper sets self.sock to None once it's created a
    ManagedConnection -- from there on the connection is is charge of
    closing the socket.

zrpc/connection.py:

  ManagedServerConnection:

  - Changed the order in which close() calls things; super_close()
    should be last.

  ManagedConnection:

  - Ditto, and call the manager's close_conn() instead of
    notify_closed().

tests/testZEO.py:

  - In checkReconnectSwitch(), we can now open the client storage with
    wait=1 and read_only_fallback=1.
parent f8411024
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Network ZODB storage client """Network ZODB storage client
$Id: ClientStorage.py,v 1.64 2002/09/20 13:35:07 gvanrossum Exp $ $Id: ClientStorage.py,v 1.65 2002/09/20 17:37:34 gvanrossum Exp $
""" """
# XXX TO DO # XXX TO DO
...@@ -107,6 +107,7 @@ class ClientStorage: ...@@ -107,6 +107,7 @@ class ClientStorage:
self._is_read_only = read_only self._is_read_only = read_only
self._storage = storage self._storage = storage
self._read_only_fallback = read_only_fallback self._read_only_fallback = read_only_fallback
self._connection = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client', self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0, 'supportsUndo':0, 'supportsVersions': 0,
...@@ -200,11 +201,9 @@ class ClientStorage: ...@@ -200,11 +201,9 @@ class ClientStorage:
self._server._update() self._server._update()
def testConnection(self, conn): def testConnection(self, conn):
"""Return a pair (stub, preferred). """Test a connection.
Where: This returns: 1 if the connection is an optimal match,
- stub is an RPC stub
- preferred is: 1 if the connection is an optimal match,
0 if it is a suboptimal but acceptable match 0 if it is a suboptimal but acceptable match
It can also raise DisconnectedError or ReadOnlyError. It can also raise DisconnectedError or ReadOnlyError.
...@@ -217,27 +216,33 @@ class ClientStorage: ...@@ -217,27 +216,33 @@ class ClientStorage:
stub = ServerStub.StorageServer(conn) stub = ServerStub.StorageServer(conn)
try: try:
stub.register(str(self._storage), self._is_read_only) stub.register(str(self._storage), self._is_read_only)
return (stub, 1) return 1
except POSException.ReadOnlyError: except POSException.ReadOnlyError:
if not self._read_only_fallback: if not self._read_only_fallback:
raise raise
log2(INFO, "Got ReadOnlyError; trying again with read_only=1") log2(INFO, "Got ReadOnlyError; trying again with read_only=1")
stub.register(str(self._storage), read_only=1) stub.register(str(self._storage), read_only=1)
return (stub, 0) return 0
def notifyConnected(self, stub): def notifyConnected(self, conn):
"""Start using the given RPC stub. """Start using the given connection.
This is called by ConnectionManager after it has decided which This is called by ConnectionManager after it has decided which
connection should be used. The stub is one returned by a connection should be used.
previous testConnection() call.
""" """
if self._connection is not None:
log2(INFO, "Reconnected to storage")
else:
log2(INFO, "Connected to storage") log2(INFO, "Connected to storage")
stub = ServerStub.StorageServer(conn)
self._oids = [] self._oids = []
self._info.update(stub.get_info()) self._info.update(stub.get_info())
self.verify_cache(stub) self.verify_cache(stub)
# XXX The stub should be saved here and set in endVerify() below. # XXX The stub should be saved here and set in endVerify() below.
if self._connection is not None:
self._connection.close()
self._connection = conn
self._server = stub self._server = stub
def verify_cache(self, server): def verify_cache(self, server):
...@@ -257,6 +262,7 @@ class ClientStorage: ...@@ -257,6 +262,7 @@ class ClientStorage:
def notifyDisconnected(self): def notifyDisconnected(self):
log2(PROBLEM, "Disconnected from storage") log2(PROBLEM, "Disconnected from storage")
self._connection = None
self._server = disconnected_stub self._server = disconnected_stub
def __len__(self): def __len__(self):
......
...@@ -499,7 +499,7 @@ class ConnectionTests(StorageTestBase.StorageTestBase): ...@@ -499,7 +499,7 @@ class ConnectionTests(StorageTestBase.StorageTestBase):
# Start a read-only server # Start a read-only server
self._startServer(create=0, index=0, read_only=1) self._startServer(create=0, index=0, read_only=1)
# Start a client in fallback mode # Start a client in fallback mode
self._storage = self.openClientStorage(wait=0, read_only_fallback=1) self._storage = self.openClientStorage(wait=1, read_only_fallback=1)
# Stores should fail here # Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore) self.assertRaises(ReadOnlyError, self._dostore)
......
...@@ -36,13 +36,12 @@ class ConnectionManager: ...@@ -36,13 +36,12 @@ class ConnectionManager:
self.client = client self.client = client
self.tmin = tmin self.tmin = tmin
self.tmax = tmax self.tmax = tmax
self.connected = 0 self.cond = threading.Condition(threading.Lock())
self.connection = None self.connection = None # Protected by self.cond
self.closed = 0 self.closed = 0
# If thread is not None, then there is a helper thread # If thread is not None, then there is a helper thread
# attempting to connect. thread is protected by thread_lock. # attempting to connect.
self.thread = None self.thread = None # Protected by self.cond
self.thread_lock = threading.Lock()
self.trigger = None self.trigger = None
self.thr_async = 0 self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async) ThreadedAsync.register_loop_callback(self.set_async)
...@@ -85,21 +84,26 @@ class ConnectionManager: ...@@ -85,21 +84,26 @@ class ConnectionManager:
def close(self): def close(self):
"""Prevent ConnectionManager from opening new connections""" """Prevent ConnectionManager from opening new connections"""
self.closed = 1 self.closed = 1
self.thread_lock.acquire() self.cond.acquire()
try: try:
t = self.thread t = self.thread
self.thread = None
conn = self.connection
finally: finally:
self.thread_lock.release() self.cond.release()
if t is not None: if t is not None:
log("CM.close(): stopping and joining thread") log("CM.close(): stopping and joining thread")
t.stop() t.stop()
t.join(30) t.join(30)
if t.isAlive(): if t.isAlive():
log("CM.close(): self.thread.join() timed out") log("CM.close(): self.thread.join() timed out",
if self.connection: level=zLOG.WARNING)
self.connection.close() if conn is not None:
# This will call close_conn() below which clears self.connection
conn.close()
if self.trigger is not None: if self.trigger is not None:
self.trigger.close() self.trigger.close()
self.trigger = None
def set_async(self, map): def set_async(self, map):
# This is the callback registered with ThreadedAsync. The # This is the callback registered with ThreadedAsync. The
...@@ -131,23 +135,36 @@ class ConnectionManager: ...@@ -131,23 +135,36 @@ class ConnectionManager:
finishes quickly. finishes quickly.
""" """
# XXX will a single attempt take too long? # XXX Will a single attempt take too long?
# XXX Answer: it depends -- normally, you'll connect or get a
# connection refused error very quickly. Packet-eating
# firewalls and other mishaps may cause the connect to take a
# long time to time out though. It's also possible that you
# connect quickly to a slow server, and the attempt includes
# at least one roundtrip to the server (the register() call).
# But that's as fast as you can expect it to be.
self.connect() self.connect()
self.thread_lock.acquire() self.cond.acquire()
try: try:
t = self.thread t = self.thread
conn = self.connection
finally: finally:
self.thread_lock.release() self.cond.release()
if t is not None: if t is not None and conn is None:
event = t.one_attempt event = t.one_attempt
event.wait() event.wait()
return self.connected self.cond.acquire()
try:
conn = self.connection
finally:
self.cond.release()
return conn is not None
def connect(self, sync=0): def connect(self, sync=0):
if self.connected == 1: self.cond.acquire()
return
self.thread_lock.acquire()
try: try:
if self.connection is not None:
return
t = self.thread t = self.thread
if t is None: if t is None:
log("CM.connect(): starting ConnectThread") log("CM.connect(): starting ConnectThread")
...@@ -155,37 +172,51 @@ class ConnectionManager: ...@@ -155,37 +172,51 @@ class ConnectionManager:
self.addrlist, self.addrlist,
self.tmin, self.tmax) self.tmin, self.tmax)
t.start() t.start()
if sync:
while self.connection is None:
self.cond.wait(30)
if self.connection is None:
log("CM.connect(sync=1): still waiting...")
finally: finally:
self.thread_lock.release() self.cond.release()
if sync: if sync:
t.join(30) assert self.connection is not None
while t.isAlive():
log("CM.connect(sync=1): thread join timed out")
t.join(30)
def connect_done(self, conn, preferred): def connect_done(self, conn, preferred):
# Called by ConnectWrapper.notify_client() after notifying the client
log("CM.connect_done(preferred=%s)" % preferred) log("CM.connect_done(preferred=%s)" % preferred)
self.connected = 1 self.cond.acquire()
try:
self.connection = conn self.connection = conn
if preferred: if preferred:
self.thread_lock.acquire()
try:
self.thread = None self.thread = None
self.cond.notifyAll() # Wake up connect(sync=1)
finally: finally:
self.thread_lock.release() self.cond.release()
def notify_closed(self, conn): def close_conn(self, conn):
# Called by the connection when it is closed
self.cond.acquire()
try:
if conn is not self.connection: if conn is not self.connection:
# Closing a non-current connection # Closing a non-current connection
log("CM.notify_closed() non-current", level=zLOG.BLATHER) log("CM.close_conn() non-current", level=zLOG.BLATHER)
return return
log("CM.notify_closed()") log("CM.close_conn()")
self.connected = 0
self.connection = None self.connection = None
finally:
self.cond.release()
self.client.notifyDisconnected() self.client.notifyDisconnected()
if not self.closed: if not self.closed:
self.connect() self.connect()
def is_connected(self):
self.cond.acquire()
try:
return self.connection is not None
finally:
self.cond.release()
# When trying to do a connect on a non-blocking socket, some outcomes # When trying to do a connect on a non-blocking socket, some outcomes
# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected # are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
# when an initial connect can't complete immediately. Set _CONNECT_OK # when an initial connect can't complete immediately. Set _CONNECT_OK
...@@ -207,20 +238,20 @@ class ConnectThread(threading.Thread): ...@@ -207,20 +238,20 @@ class ConnectThread(threading.Thread):
The thread is passed a ConnectionManager and the manager's client The thread is passed a ConnectionManager and the manager's client
as arguments. It calls testConnection() on the client when a as arguments. It calls testConnection() on the client when a
socket connects; that should return a tuple (stub, score) where socket connects; that should return 1 or 0 indicating whether this
stub is an RPC stub, and score is 1 or 0 depending on whether this
is a preferred or a fallback connection. It may also raise an is a preferred or a fallback connection. It may also raise an
exception, in which case the connection is abandoned. exception, in which case the connection is abandoned.
The thread will continue to run, attempting connections, until a The thread will continue to run, attempting connections, until a
preferred stub is seen or until all sockets have been tried. preferred connection is seen or until all sockets have been tried.
As soon as testConnection() returns a preferred stub, or after all As soon as testConnection() finds a preferred connection, or after
sockets have been tried and at least one fallback stub has been all sockets have been tried and at least one fallback connection
seen, notifyConnected(stub) is called on the client and has been seen, notifyConnected(connection) is called on the client
connect_done() on the manager. If this was a preferred stub, the and connect_done() on the manager. If this was a preferred
thread then exits; otherwise, it keeps trying until it gets a connection, the thread then exits; otherwise, it keeps trying
preferred stub, and then reconnects the client using that stub. until it gets a preferred connection, and then reconnects the
client using that connection.
""" """
...@@ -248,6 +279,7 @@ class ConnectThread(threading.Thread): ...@@ -248,6 +279,7 @@ class ConnectThread(threading.Thread):
def run(self): def run(self):
delay = self.tmin delay = self.tmin
success = 0
while not self.stopped: while not self.stopped:
success = self.try_connecting() success = self.try_connecting()
if not self.one_attempt.isSet(): if not self.one_attempt.isSet():
...@@ -315,10 +347,10 @@ class ConnectThread(threading.Thread): ...@@ -315,10 +347,10 @@ class ConnectThread(threading.Thread):
del wrappers[wrap] del wrappers[wrap]
# If we've got wrappers left at this point, they're fallback # If we've got wrappers left at this point, they're fallback
# connections. Try notifying then until one succeeds. # connections. Try notifying them until one succeeds.
for wrap in wrappers.keys(): for wrap in wrappers.keys():
assert wrap.state == "tested" and wrap.preferred == 0 assert wrap.state == "tested" and wrap.preferred == 0
if self.mgr.connected: if self.mgr.is_connected():
wrap.close() wrap.close()
else: else:
wrap.notify_client() wrap.notify_client()
...@@ -356,7 +388,6 @@ class ConnectWrapper: ...@@ -356,7 +388,6 @@ class ConnectWrapper:
self.state = "closed" self.state = "closed"
self.sock = None self.sock = None
self.conn = None self.conn = None
self.stub = None
self.preferred = 0 self.preferred = 0
log("CW: attempt to connect to %s" % repr(addr)) log("CW: attempt to connect to %s" % repr(addr))
try: try:
...@@ -402,8 +433,9 @@ class ConnectWrapper: ...@@ -402,8 +433,9 @@ class ConnectWrapper:
""" """
self.conn = ManagedConnection(self.sock, self.addr, self.conn = ManagedConnection(self.sock, self.addr,
self.client, self.mgr) self.client, self.mgr)
self.sock = None # The socket is now owned by the connection
try: try:
(self.stub, self.preferred) = self.client.testConnection(self.conn) self.preferred = self.client.testConnection(self.conn)
self.state = "tested" self.state = "tested"
except ReadOnlyError: except ReadOnlyError:
log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr)) log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr))
...@@ -422,16 +454,12 @@ class ConnectWrapper: ...@@ -422,16 +454,12 @@ class ConnectWrapper:
If this succeeds, call the manager's connect_done(). If this succeeds, call the manager's connect_done().
If the client is already connected, we assume it's a fallbac If the client is already connected, we assume it's a fallback
connection, the new stub must be a preferred stub, and we connection, and the new connection must be a preferred
first disconnect the client. connection. The client will close the old connection.
""" """
if self.mgr.connected:
assert self.preferred
log("CW: reconnecting client to preferred stub")
self.mgr.connection.close()
try: try:
self.client.notifyConnected(self.stub) self.client.notifyConnected(self.conn)
except: except:
log("CW: error in notifyConnected (%s)" % repr(self.addr), log("CW: error in notifyConnected (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info()) level=zLOG.ERROR, error=sys.exc_info())
...@@ -443,7 +471,7 @@ class ConnectWrapper: ...@@ -443,7 +471,7 @@ class ConnectWrapper:
def close(self): def close(self):
"""Close the socket and reset everything.""" """Close the socket and reset everything."""
self.state = "closed" self.state = "closed"
self.stub = self.mgr = self.client = None self.mgr = self.client = None
self.preferred = 0 self.preferred = 0
if self.conn is not None: if self.conn is not None:
# Closing the ZRPC connection will eventually close the # Closing the ZRPC connection will eventually close the
......
...@@ -427,8 +427,8 @@ class ManagedServerConnection(Connection): ...@@ -427,8 +427,8 @@ class ManagedServerConnection(Connection):
def close(self): def close(self):
self.obj.notifyDisconnected() self.obj.notifyDisconnected()
self.__super_close()
self.__mgr.close_conn(self) self.__mgr.close_conn(self)
self.__super_close()
class ManagedConnection(Connection): class ManagedConnection(Connection):
"""Client-side Connection subclass.""" """Client-side Connection subclass."""
...@@ -469,5 +469,5 @@ class ManagedConnection(Connection): ...@@ -469,5 +469,5 @@ class ManagedConnection(Connection):
return self.check_mgr_async() return self.check_mgr_async()
def close(self): def close(self):
self.__mgr.close_conn(self)
self.__super_close() self.__super_close()
self.__mgr.notify_closed(self)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment