Commit ec80a2e8 authored by Guido van Rossum's avatar Guido van Rossum

A bunch of renamings and other small changes to make the code a little

more understandable.

Got rid of the Connected() exception -- connect(), now renamed to
try_connect(), returns a 1/0 result instead of raising Connected() to
indicate success.
parent 9f0ad321
......@@ -29,8 +29,8 @@ from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager:
"""Keeps a connection up over time"""
def __init__(self, addr, client, tmin=1, tmax=180):
self.set_addr(addr)
def __init__(self, addrs, client, tmin=1, tmax=180):
self.addrlist = self._parse_addrs(addrs)
self.client = client
self.tmin = tmin
self.tmax = tmax
......@@ -46,26 +46,27 @@ class ConnectionManager:
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addr)
return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
def set_addr(self, addr):
"Set one or more addresses to use for server."
def _parse_addrs(self, addrs):
# Return a list of (addr_type, addr) pairs.
# For backwards compatibility (and simplicity?) the
# constructor accepts a single address in the addr argument --
# constructor accepts a single address in the addrs argument --
# a string for a Unix domain socket or a 2-tuple with a
# hostname and port. It can also accept a list of such addresses.
addr_type = self._guess_type(addr)
addr_type = self._guess_type(addrs)
if addr_type is not None:
self.addr = [(addr_type, addr)]
return [(addr_type, addrs)]
else:
self.addr = []
for a in addr:
addr_type = self._guess_type(a)
addrlist = []
for addr in addrs:
addr_type = self._guess_type(addr)
if addr_type is None:
raise ValueError, "unknown address in list: %s" % repr(a)
self.addr.append((addr_type, a))
addrlist.append((addr_type, addr))
return addrlist
def _guess_type(self, addr):
if isinstance(addr, types.StringType):
......@@ -146,7 +147,8 @@ class ConnectionManager:
t = self.thread
if t is None:
log("starting thread to connect to server")
self.thread = t = ConnectThread(self, self.client, self.addr,
self.thread = t = ConnectThread(self, self.client,
self.addrlist,
self.tmin, self.tmax)
t.start()
finally:
......@@ -175,11 +177,6 @@ class ConnectionManager:
if not self.closed:
self.connect()
class Connected(Exception):
# helper for non-local exit
def __init__(self, sock):
self.sock = sock
# When trying to do a connect on a non-blocking socket, some outcomes
# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
# when an initial connect can't complete immediately. Set _CONNECT_OK
......@@ -209,11 +206,11 @@ class ConnectThread(threading.Thread):
# We don't expect clients to call any methods of this Thread other
# than close() and those defined by the Thread API.
def __init__(self, mgr, client, addrs, tmin, tmax):
self.__super_init(name="Connect(%s)" % addrs)
def __init__(self, mgr, client, addrlist, tmin, tmax):
self.__super_init(name="Connect(%s)" % addrlist)
self.mgr = mgr
self.client = client
self.addrs = addrs
self.addrlist = addrlist
self.tmin = tmin
self.tmax = tmax
self.stopped = 0
......@@ -248,7 +245,7 @@ class ConnectThread(threading.Thread):
s.close()
def attempt_connects(self):
"""Try connecting to all self.addrs addresses.
"""Try connecting to all self.addrlist addresses.
If at least one succeeds, pick a success arbitrarily, close all other
successes (if any), and return true. If none succeed, return false.
......@@ -256,49 +253,50 @@ class ConnectThread(threading.Thread):
self.sockets = {} # {open socket: connection address}
log("attempting connection on %d sockets" % len(self.addrs))
try:
for domain, addr in self.addrs:
if __debug__:
log("attempt connection to %s" % repr(addr),
level=zLOG.DEBUG)
try:
s = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("Failed to create socket with domain=%s: %s" % (
domain, err), level=zLOG.ERROR)
continue
s.setblocking(0)
self.sockets[s] = addr
# connect() raises Connected iff it succeeds
# XXX can still block for a while if addr requires DNS
self.connect(s)
# next wait until they actually connect
while self.sockets:
if self.stopped:
self.close_sockets()
return 0
try:
sockets = self.sockets.keys()
r, w, x = select.select([], sockets, sockets, 1.0)
except select.error:
continue
for s in x:
del self.sockets[s]
s.close()
for s in w:
# connect() raises Connected iff it succeeds
self.connect(s)
except Connected, container:
s = container.sock
log("attempting connection on %d sockets" % len(self.addrlist))
ok = 0
for domain, addr in self.addrlist:
if __debug__:
log("attempt connection to %s" % repr(addr),
level=zLOG.DEBUG)
try:
s = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("Failed to create socket with domain=%s: %s" % (
domain, err), level=zLOG.ERROR)
continue
s.setblocking(0)
self.sockets[s] = addr
# XXX can still block for a while if addr requires DNS
if self.try_connect(s):
ok = 1
break
# next wait until they actually connect
while not ok and self.sockets:
if self.stopped:
self.close_sockets()
return 0
try:
sockets = self.sockets.keys()
r, w, x = select.select([], sockets, sockets, 1.0)
except select.error:
continue
for s in x:
del self.sockets[s]
s.close()
for s in w:
if self.try_connect(s):
ok = 1
break
if ok:
del self.sockets[s] # don't close the newly connected socket
self.close_sockets()
return 1
return 0
return ok
def connect(self, s):
"""Call s.connect_ex(addr); raise Connected iff connection succeeds.
def try_connect(self, s):
"""Call s.connect_ex(addr); return true iff connection succeeds.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
......@@ -323,13 +321,13 @@ class ConnectThread(threading.Thread):
else:
log("connect_ex(%s) == %s" % (addr, e))
if e in _CONNECT_IN_PROGRESS:
return
return 0
elif e in _CONNECT_OK:
# special cases to deal with winsock oddities
if sys.platform.startswith("win") and e == 0:
# It appears that winsock isn't behaving as
# expected on Win2k. It's possible for connect()
# expected on Win2k. It's possible for connect_ex()
# to return 0, but the connection to have failed.
# In particular, in situations where I expect to
# get a Connection refused (10061), I'm seeing
......@@ -339,7 +337,7 @@ class ConnectThread(threading.Thread):
r, w, x = select.select([s], [s], [s], 0.1)
if not (r or w or x):
return
return 0
if x:
# see comment at the end of the function
s.close()
......@@ -347,7 +345,7 @@ class ConnectThread(threading.Thread):
c = self.test_connection(s, addr)
if c:
log("connected to %s" % repr(addr), level=zLOG.DEBUG)
raise Connected(s)
return 1
else:
log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
level=zLOG.DEBUG)
......@@ -357,6 +355,7 @@ class ConnectThread(threading.Thread):
# sockets.
s.close()
del self.sockets[s]
return 0
def test_connection(self, s, addr):
# Establish a connection at the zrpc level and call the
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment