Commit d75fcc59 authored by Julien Muchembled's avatar Julien Muchembled

Fix leak of file descriptors in unit tests

There remain only one leak in ClientApplicationTests.test_connectToPrimaryNode
because of Mock objects.
parent c88c6ac5
......@@ -158,7 +158,6 @@
Tests
- Use another mock library: Python 3.3+ has unittest.mock, which is
available for earlier versions at https://pypi.python.org/pypi/mock
- Fix epoll descriptor leak.
Later
- Consider auto-generating cluster name upon initial startup (it might
......
......@@ -73,11 +73,14 @@ class ThreadedApplication(object):
def close(self):
# Clear all connection
self.master_conn = None
if self.poll_thread.is_alive():
for conn in self.em.getConnectionList():
conn.close()
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(True)
else:
self.em.close()
def start(self):
self.poll_thread.is_alive() or self.poll_thread.start()
......@@ -87,6 +90,7 @@ class ThreadedApplication(object):
try:
self._run()
finally:
self.em.close()
logging.debug("Poll thread stopped")
def _run(self):
......
......@@ -204,3 +204,18 @@ class ReadBuffer(object):
self.size = 0
self.content.clear()
class cached_property(object):
"""
A property that is only computed once per instance and then replaces itself
with an ordinary attribute. Deleting the attribute resets the property.
"""
def __init__(self, func):
self.__doc__ = func.__doc__
self.func = func
def __get__(self, obj, cls):
if obj is None: return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
......@@ -23,6 +23,7 @@ import socket
import sys
import tempfile
import unittest
import weakref
import MySQLdb
import transaction
......@@ -549,7 +550,7 @@ class Patch(object):
def __enter__(self):
self.apply()
return self
return weakref.proxy(self)
def __exit__(self, t, v, tb):
self.__del__()
......
......@@ -795,6 +795,7 @@ class ClientApplicationTests(NeoUnitTestBase):
app.nm.getByAddress(conn.getAddress())._connection = None
app._ask = _ask_base
# faked environnement
app.em.close()
app.em = Mock({'getConnectionList': []})
app.pt = Mock({ 'operational': False})
app.start = lambda: None
......
......@@ -400,9 +400,6 @@ class NEOCluster(object):
except (AlreadyStopped, NodeProcessError):
pass
def getNEOCTL(self):
return self.neoctl
def getZODBStorage(self, **kw):
master_nodes = self.master_nodes.replace('/', ' ')
result = Storage(
......@@ -629,6 +626,7 @@ class NEOCluster(object):
self.expectCondition(expected_storage_not_known, *args, **kw)
def __del__(self):
self.neoctl.close()
if self.cleanup_on_delete:
os.removedirs(self.temp_dir)
......
......@@ -21,7 +21,6 @@ import ZODB
import socket
from struct import pack
from neo.neoctl.neoctl import NeoCTL
from neo.lib.util import makeChecksum, u64
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
......@@ -78,8 +77,8 @@ class ClientTests(NEOFunctionalTest):
)
def _tearDown(self, success):
if self.neo is not None:
self.neo.stop()
del self.neo
NEOFunctionalTest._tearDown(self, success)
def __setup(self):
......@@ -261,7 +260,6 @@ class ClientTests(NEOFunctionalTest):
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
# BUG: The following 2 lines creates 2 app, i.e. 2 TCP connections
# to the storage, so there may be a race condition at network
......@@ -295,7 +293,6 @@ class ClientTests(NEOFunctionalTest):
temp_dir = self.getTempDirectory(),
address_type = socket.AF_INET6
)
neoctl = NeoCTL(('::1', 0))
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
......@@ -310,7 +307,6 @@ class ClientTests(NEOFunctionalTest):
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
......@@ -353,9 +349,6 @@ class ClientTests(NEOFunctionalTest):
master. This OID must be intercepted at commit, used for next OID
generations and persistently saved on storage nodes.
"""
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
st1 = conn1._storage
......
......@@ -22,19 +22,16 @@ from . import NEOCluster, NEOFunctionalTest
class ClusterTests(NEOFunctionalTest):
def setUp(self):
NEOFunctionalTest.setUp(self)
self.neo = None
def _tearDown(self, success):
if self.neo is not None:
if hasattr(self, "neo"):
self.neo.stop()
del self.neo
NEOFunctionalTest._tearDown(self, success)
def testClusterStartup(self):
neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1,
neo = self.neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1,
temp_dir=self.getTempDirectory())
neoctl = neo.getNEOCTL()
neoctl = neo.neoctl
neo.run()
# Runing a new cluster doesn't exit Recovery state.
s1, s2 = neo.getStorageProcessList()
......@@ -75,7 +72,6 @@ class ClusterTests(NEOFunctionalTest):
def testClusterBreaks(self):
self.neo = NEOCluster(['test_neo1'],
master_count=1, temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
......@@ -87,7 +83,6 @@ class ClusterTests(NEOFunctionalTest):
self.neo = NEOCluster(['test_neo1', 'test_neo2'],
partitions=2, master_count=1, replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
......@@ -99,7 +94,6 @@ class ClusterTests(NEOFunctionalTest):
self.neo = NEOCluster(['test_neo1', 'test_neo2'],
partitions=2, replicas=1, master_count=1,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
......@@ -112,7 +106,6 @@ class ClusterTests(NEOFunctionalTest):
self.neo = NEOCluster(['test_neo1', 'test_neo2'],
partitions=10, replicas=0, master_count=MASTER_COUNT,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
self.neo.expectClusterRunning()
self.neo.expectAllMasters(MASTER_COUNT, NodeStates.RUNNING)
......@@ -126,7 +119,6 @@ class ClusterTests(NEOFunctionalTest):
# start a cluster
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(0)
......
......@@ -29,8 +29,6 @@ class MasterTests(NEOFunctionalTest):
temp_dir=self.getTempDirectory())
self.neo.stop()
self.neo.run()
self.storage = self.neo.getZODBStorage()
self.neoctl = self.neo.getNEOCTL()
def _tearDown(self, success):
self.neo.stop()
......@@ -41,14 +39,15 @@ class MasterTests(NEOFunctionalTest):
self.neo.expectAllMasters(MASTER_NODE_COUNT, NodeStates.RUNNING)
# Kill
primary_uuid = self.neoctl.getPrimary()
neoctl = self.neo.neoctl
primary_uuid = neoctl.getPrimary()
for master in self.neo.getMasterProcessList():
uuid = master.getUUID()
if uuid != primary_uuid:
break
self.neo.neoctl.killNode(uuid)
neoctl.killNode(uuid)
self.neo.expectDead(master)
self.assertRaises(RuntimeError, self.neo.neoctl.killNode, primary_uuid)
self.assertRaises(RuntimeError, neoctl.killNode, primary_uuid)
def testStoppingPrimaryWithTwoSecondaries(self):
# Wait for masters to stabilize
......
......@@ -33,13 +33,10 @@ OBJECT_NUMBER = 100
class StorageTests(NEOFunctionalTest):
def setUp(self):
NEOFunctionalTest.setUp(self)
self.neo = None
def _tearDown(self, success):
if self.neo is not None:
if hasattr(self, "neo"):
self.neo.stop()
del self.neo
NEOFunctionalTest._tearDown(self, success)
def __setup(self, storage_number=2, pending_number=0, replicas=1,
......
......@@ -28,6 +28,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# create an application object
config = self.getMasterConfiguration(master_number=1, replicas=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.pt.setID(1)
self.app.em = Mock()
......
......@@ -55,6 +55,7 @@ class MasterClientElectionTests(MasterClientElectionTestBase):
# create an application object
config = self.getMasterConfiguration(master_number=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.em = Mock()
self.app.uuid = self.getMasterUUID()
......@@ -206,6 +207,7 @@ class MasterServerElectionTests(MasterClientElectionTestBase):
# create an application object
config = self.getMasterConfiguration(master_number=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.name = 'NEOCLUSTER'
self.app.em = Mock()
......
......@@ -27,6 +27,10 @@ class MasterAppTests(NeoUnitTestBase):
self.app = Application(config)
self.app.pt.clear()
def _tearDown(self, success):
self.app.close()
NeoUnitTestBase._tearDown(self, success)
def test_06_broadcastNodeInformation(self):
# defined some nodes to which data will be send
master_uuid = self.getMasterUUID()
......
......@@ -42,6 +42,10 @@ class MasterRecoveryTests(NeoUnitTestBase):
self.master_address = ('127.0.0.1', self.master_port)
self.storage_address = ('127.0.0.1', self.storage_port)
def _tearDown(self, success):
self.app.close()
NeoUnitTestBase._tearDown(self, success)
# Common methods
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021):
......
......@@ -30,6 +30,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# create an application object
config = self.getMasterConfiguration(master_number=1, replicas=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.em = Mock()
self.service = StorageServiceHandler(self.app)
......
......@@ -44,6 +44,10 @@ class MasterVerificationTests(NeoUnitTestBase):
self.master_address = ('127.0.0.1', self.master_port)
self.storage_address = ('127.0.0.1', self.storage_port)
def _tearDown(self, success):
self.app.close()
NeoUnitTestBase._tearDown(self, success)
# Common methods
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021):
......
......@@ -35,7 +35,7 @@ from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
from neo.lib.util import parseMasterList, p64
from neo.lib.util import cached_property, parseMasterList, p64
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_USER
......@@ -297,7 +297,7 @@ class ServerNode(Node):
def resetNode(self):
assert not self.is_alive()
kw = self._init_args
self.__dict__.clear()
self.close()
self.__init__(**kw)
def start(self):
......@@ -374,9 +374,9 @@ class ClientApplication(Node, neo.client.app.Application):
super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.poll_thread.node_name = name
def run(self):
def _run(self):
try:
super(ClientApplication, self).run()
super(ClientApplication, self)._run()
finally:
self.em.epoll.exit()
......@@ -559,6 +559,7 @@ class NEOCluster(object):
importer=None, autostart=None):
self.name = 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100))
self.compress = compress
master_list = [MasterApplication.newAddress()
for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
......@@ -601,8 +602,6 @@ class NEOCluster(object):
self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
for x in db_list]
self.admin_list = [AdminApplication(**kw)]
self.client = ClientApplication(name=self.name,
master_nodes=self.master_nodes, compress=compress)
self.neoctl = NeoCTL(self.admin.getVirtualAddress())
def __repr__(self):
......@@ -636,8 +635,7 @@ class NEOCluster(object):
kw['clear_database'] = clear_database
for node in getattr(self, node_type + '_list'):
node.resetNode(**kw)
self.client = ClientApplication(name=self.name,
master_nodes=self.master_nodes)
self.neoctl.close()
self.neoctl = NeoCTL(self.admin.getVirtualAddress())
def start(self, storage_list=None, fast_startup=False):
......@@ -660,6 +658,22 @@ class NEOCluster(object):
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
self.enableStorageList(storage_list)
@cached_property
def client(self):
client = ClientApplication(name=self.name,
master_nodes=self.master_nodes, compress=self.compress)
# Make sure client won't be reused after it was closed.
def close():
client = self.client
del self.client, client.close
client.close()
client.close = close
return client
@cached_property
def db(self):
return ZODB.DB(storage=self.getZODBStorage())
def startCluster(self):
try:
self.neoctl.startCluster()
......@@ -678,14 +692,6 @@ class NEOCluster(object):
for node in storage_list:
assert self.getNodeState(node) == NodeStates.RUNNING
@property
def db(self):
try:
return self._db
except AttributeError:
self._db = db = ZODB.DB(storage=self.getZODBStorage())
return db
def join(self, thread_list, timeout=5):
timeout += time.time()
while thread_list:
......@@ -695,11 +701,12 @@ class NEOCluster(object):
def stop(self):
logging.debug("stopping %s", self)
self.__dict__.pop('_db', self.client).close()
client = self.__dict__.get("client")
client is None or self.__dict__.pop("db", client).close()
node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list:
node.em.wakeup(True)
node_list.append(self.client.poll_thread)
client is None or node_list.append(client.poll_thread)
self.join(node_list)
logging.debug("stopped %s", self)
self._unpatch()
......@@ -751,7 +758,6 @@ class NEOCluster(object):
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'):
node.close()
self.client.em.close()
except:
__print_exc()
raise
......
......@@ -828,6 +828,7 @@ class Test(NEOThreadedTest):
cluster.start(cluster.storage_list[:2])
finally:
cluster.stop()
del cluster.startCluster
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment