Commit 330e087f authored by Aurel's avatar Aurel

reimplement dropConnection


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@187 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 568eebdd
...@@ -81,56 +81,55 @@ class ConnectionPool(object): ...@@ -81,56 +81,55 @@ class ConnectionPool(object):
"""Drop a connection.""" """Drop a connection."""
for node_uuid, conn in self.connection_dict.items(): for node_uuid, conn in self.connection_dict.items():
# Drop first connection which looks not used # Drop first connection which looks not used
if not conn.prending(): conn.lock()
self.connection_dict.pop(node_uuid) try:
# Recheck connection again because it can have been if not conn.prending() and not self.app.dispatcher.registered(conn.getUUID()):
# returned by pool to another thread
if conn.prending():
self.connection_dict[node_uuid] = conn
continue
else:
conn.close() conn.close()
break break
finally:
conn.unlock()
logging.info('connection to storage node %s:%d closed', *(conn.getAddress())) logging.info('connection to storage node %s:%d closed', *(conn.getAddress()))
def _createNodeConnection(self, node): def _createNodeConnection(self, node):
"""Create a connection to a given storage node.""" """Create a connection to a given storage node."""
if self.pool_size > self.max_pool_size:
# must drop some unused connections
self._dropConnection()
conn = self._initNodeConnection(node)
if conn is None:
return None
# add node to node manager
if self.app.nm.getNodeByServer(node.getServer()) is None:
n = StorageNode(node.getServer())
self.app.nm.add(n)
self.connection_dict[node.getUUID()] = conn
conn.lock()
return conn
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
self.connection_lock_acquire() self.connection_lock_acquire()
try: try:
# check dict again, maybe another thread
# just created the connection
if self.connection_dict.has_key(node.getUUID()): if self.connection_dict.has_key(node.getUUID()):
return self.connection_dict[node.getUUID()] # Already connected to node
if self.pool_size > self.max_pool_size: conn = self.connection_dict[node.getUUID()]
# must drop some unused connections conn.lock()
self._dropConnection() return conn
conn = self._initNodeConnection(node) else:
if conn is None: # Create new connection to node
return None return self._createNodeConnection(node)
# add node to node manager
if self.app.nm.getNodeByServer(node.getServer()) is None:
n = StorageNode(node.getServer())
self.app.nm.add(n)
self.connection_dict[node.getUUID()] = conn
return conn
finally: finally:
self.connection_lock_release() self.connection_lock_release()
def getConnForNode(self, node):
"""Return connection object to a given node
If no connection exists, create a new one"""
if self.connection_dict.has_key(node.getUUID()):
# Already connected to node
return self.connection_dict[node.getUUID()]
else:
# Create new connection to node
return self._createNodeConnection(node)
def removeConnection(self, node): def removeConnection(self, node):
"""Explicitly remove connection when a node is broken.""" """Explicitly remove connection when a node is broken."""
if self.connection_dict.has_key(node.getUUID()): self.connection_lock_acquire()
self.connection_dict.pop(node.getUUID()) try:
if self.connection_dict.has_key(node.getUUID()):
self.connection_dict.pop(node.getUUID())
finally:
self.connection_lock_release()
class Application(object): class Application(object):
...@@ -287,7 +286,6 @@ class Application(object): ...@@ -287,7 +286,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -440,7 +438,6 @@ class Application(object): ...@@ -440,7 +438,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -488,7 +485,6 @@ class Application(object): ...@@ -488,7 +485,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -529,7 +525,6 @@ class Application(object): ...@@ -529,7 +525,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -549,7 +544,6 @@ class Application(object): ...@@ -549,7 +544,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -627,7 +621,6 @@ class Application(object): ...@@ -627,7 +621,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -702,7 +695,6 @@ class Application(object): ...@@ -702,7 +695,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -736,7 +728,6 @@ class Application(object): ...@@ -736,7 +728,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -782,7 +773,6 @@ class Application(object): ...@@ -782,7 +773,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
...@@ -820,7 +810,6 @@ class Application(object): ...@@ -820,7 +810,6 @@ class Application(object):
if conn is None: if conn is None:
continue continue
conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment