Commit 5f497459 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Correct ConnectionPool. Make sure that a connection is not leaked.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@219 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 69e2f2ce
......@@ -24,10 +24,9 @@ from ZODB.utils import p64, u64, oid_repr
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
def __init__(self, app, pool_size = 25):
def __init__(self, app, max_pool_size = 25):
self.app = app
self.pool_size = 0
self.max_pool_size = pool_size
self.max_pool_size = max_pool_size
self.connection_dict = {}
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
......@@ -77,24 +76,28 @@ class ConnectionPool(object):
logging.info('connected to storage node %s:%d', *(conn.getAddress()))
return conn
def _dropConnection(self):
"""Drop a connection."""
def _dropConnections(self):
"""Drop connections."""
for node_uuid, conn in self.connection_dict.items():
# Drop first connection which looks not used
conn.lock()
try:
if not conn.prending() and not self.app.dispatcher.registered(conn.getUUID()):
if not conn.pending() and \
not self.app.dispatcher.registered(conn.getUUID()):
del self.connection_dict[conn.getUUID()]
conn.close()
break
logging.info('connection to storage node %s:%d closed',
*(conn.getAddress()))
if len(self.connection_dict) <= self.max_pool_size:
break
finally:
conn.unlock()
logging.info('connection to storage node %s:%d closed', *(conn.getAddress()))
def _createNodeConnection(self, node):
"""Create a connection to a given storage node."""
if self.pool_size > self.max_pool_size:
if len(self.connection_dict) > self.max_pool_size:
# must drop some unused connections
self._dropConnection()
self._dropConnections()
conn = self._initNodeConnection(node)
if conn is None:
return None
......@@ -111,12 +114,13 @@ class ConnectionPool(object):
If no connection exists, create a new one"""
self.connection_lock_acquire()
try:
if self.connection_dict.has_key(node.getUUID()):
uuid = node.getUUID()
try:
conn = self.connection_dict[uuid]
# Already connected to node
conn = self.connection_dict[node.getUUID()]
conn.lock()
return conn
else:
except KeyError:
# Create new connection to node
return self._createNodeConnection(node)
finally:
......@@ -126,8 +130,10 @@ class ConnectionPool(object):
"""Explicitly remove connection when a node is broken."""
self.connection_lock_acquire()
try:
if self.connection_dict.has_key(node.getUUID()):
self.connection_dict.pop(node.getUUID())
try:
del self.connection_dict[node.getUUID()]
except KeyError:
pass
finally:
self.connection_lock_release()
......
......@@ -127,6 +127,9 @@ class Connection(BaseConnection):
em.removeIdleEvent(event)
self.event_dict.clear()
def __del__(self):
self.close()
def abort(self):
"""Abort dealing with this connection."""
logging.debug('aborting a socket for %s:%d', *(self.addr))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment