Commit 7d5b1559 authored by Julien Muchembled's avatar Julien Muchembled

Fix remaining memory leaks and make handler instances become singletons

parent 9fdd750f
...@@ -50,12 +50,6 @@ ...@@ -50,12 +50,6 @@
- Review handler split (CODE) - Review handler split (CODE)
The current handler split is the result of small incremental changes. A The current handler split is the result of small incremental changes. A
global review is required to make them square. global review is required to make them square.
- Make handler instances become singletons (SPEED, MEMORY)
In some places handlers are instanciated outside of App.__init__ . As a
handler is completely re-entrant (no modifiable properties) it can and
should be made a singleton (saves the CPU time needed to instanciates all
the copies - often when a connection is established, saves the memory
used by each copy).
- Review node notifications. Eg. A storage don't have to be notified of new - Review node notifications. Eg. A storage don't have to be notified of new
clients but only when one is lost. clients but only when one is lost.
- Review transactional isolation of various methods - Review transactional isolation of various methods
......
...@@ -15,8 +15,7 @@ ...@@ -15,8 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.node import NodeManager from neo.lib.app import BaseApplication
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \ from .handler import AdminEventHandler, MasterEventHandler, \
...@@ -27,13 +26,11 @@ from neo.lib.protocol import ClusterStates, Errors, \ ...@@ -27,13 +26,11 @@ from neo.lib.protocol import ClusterStates, Errors, \
NodeTypes, NodeStates, Packets NodeTypes, NodeStates, Packets
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
class Application(object): class Application(BaseApplication):
"""The storage node application.""" """The storage node application."""
def __init__(self, config): def __init__(self, config):
# Internal attributes. super(Application, self).__init__(config.getDynamicMasterList())
self.em = EventManager()
self.nm = NodeManager(config.getDynamicMasterList())
for address in config.getMasters(): for address in config.getMasters():
self.nm.createMaster(address=address) self.nm.createMaster(address=address)
...@@ -54,9 +51,7 @@ class Application(object): ...@@ -54,9 +51,7 @@ class Application(object):
def close(self): def close(self):
self.listening_conn = None self.listening_conn = None
self.nm.close() super(Application, self).close()
self.em.close()
del self.__dict__
def reset(self): def reset(self):
self.bootstrapped = False self.bootstrapped = False
......
...@@ -93,10 +93,9 @@ class TransactionContainer(dict): ...@@ -93,10 +93,9 @@ class TransactionContainer(dict):
class Application(ThreadedApplication): class Application(ThreadedApplication):
"""The client node application.""" """The client node application."""
def __init__(self, master_nodes, name, compress=True, def __init__(self, master_nodes, name, compress=True, **kw):
dynamic_master_list=None):
super(Application, self).__init__(parseMasterList(master_nodes), super(Application, self).__init__(parseMasterList(master_nodes),
name, dynamic_master_list) name, **kw)
# Internal Attributes common to all thread # Internal Attributes common to all thread
self._db = None self._db = None
self.cp = ConnectionPool(self) self.cp = ConnectionPool(self)
...@@ -135,11 +134,6 @@ class Application(ThreadedApplication): ...@@ -135,11 +134,6 @@ class Application(ThreadedApplication):
self._connecting_to_master_node = Lock() self._connecting_to_master_node = Lock()
self.compress = compress self.compress = compress
def close(self):
self.cp.flush()
self._txn_container.clear()
super(Application, self).close()
def __getattr__(self, attr): def __getattr__(self, attr):
if attr == 'pt': if attr == 'pt':
self._getMasterConnection() self._getMasterConnection()
......
#
# Copyright (C) 2015 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .event import EventManager
from .node import NodeManager
class BaseApplication(object):
def __init__(self, dynamic_master_list=None):
self._handlers = {}
self.em = EventManager()
self.nm = NodeManager(dynamic_master_list)
# XXX: Do not implement __del__ unless all references to the Application
# become weak.
# Due to cyclic references, Python < 3.4 would never call it unless
# it's closed explicitly, and in this case, there's nothing to do.
def close(self):
self.nm.close()
self.em.close()
self.__dict__.clear()
...@@ -32,7 +32,6 @@ class BootstrapManager(EventHandler): ...@@ -32,7 +32,6 @@ class BootstrapManager(EventHandler):
primary master node, connect to it then returns when the master node primary master node, connect to it then returns when the master node
is ready. is ready.
""" """
EventHandler.__init__(self, app)
self.primary = None self.primary = None
self.server = server self.server = server
self.node_type = node_type self.node_type = node_type
......
...@@ -265,8 +265,6 @@ class BaseConnection(object): ...@@ -265,8 +265,6 @@ class BaseConnection(object):
id(self), id(self),
) )
__del__ = close
def setHandler(self, handler): def setHandler(self, handler):
if self._handlers.setHandler(handler): if self._handlers.setHandler(handler):
logging.debug('Set handler %r on %r', handler, self) logging.debug('Set handler %r on %r', handler, self)
......
...@@ -19,13 +19,23 @@ from .protocol import ( ...@@ -19,13 +19,23 @@ from .protocol import (
NodeStates, Packets, Errors, BackendNotImplemented, NodeStates, Packets, Errors, BackendNotImplemented,
BrokenNodeDisallowedError, NotReadyError, PacketMalformedError, BrokenNodeDisallowedError, NotReadyError, PacketMalformedError,
ProtocolError, UnexpectedPacketError) ProtocolError, UnexpectedPacketError)
from .util import cached_property
class EventHandler(object): class EventHandler(object):
"""This class handles events.""" """This class handles events."""
def __init__(self, app): def __new__(cls, app, *args, **kw):
try:
return app._handlers[cls]
except AttributeError: # for BackupApplication
self = object.__new__(cls)
except KeyError:
self = object.__new__(cls)
if cls.__init__ is object.__init__:
app._handlers[cls] = self
self.app = app self.app = app
return self
def __repr__(self): def __repr__(self):
return self.__class__.__name__ return self.__class__.__name__
...@@ -201,9 +211,9 @@ class EventHandler(object): ...@@ -201,9 +211,9 @@ class EventHandler(object):
class MTEventHandler(EventHandler): class MTEventHandler(EventHandler):
"""Base class of handler implementations for MTClientConnection""" """Base class of handler implementations for MTClientConnection"""
def __init__(self, app): @cached_property
super(MTEventHandler, self).__init__(app) def dispatcher(self):
self.dispatcher = app.dispatcher return self.app.dispatcher
def dispatch(self, conn, packet, kw={}): def dispatch(self, conn, packet, kw={}):
assert conn.lock._is_owned() # XXX: see also lockCheckWrapper assert conn.lock._is_owned() # XXX: see also lockCheckWrapper
......
...@@ -16,12 +16,11 @@ ...@@ -16,12 +16,11 @@
import threading, weakref import threading, weakref
from . import logging from . import logging
from .app import BaseApplication
from .connection import ConnectionClosed from .connection import ConnectionClosed
from .debug import register as registerLiveDebugger from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher, ForgottenPacket from .dispatcher import Dispatcher, ForgottenPacket
from .event import EventManager
from .locking import SimpleQueue from .locking import SimpleQueue
from .node import NodeManager
from .protocol import Packets from .protocol import Packets
class app_set(weakref.WeakSet): class app_set(weakref.WeakSet):
...@@ -41,18 +40,16 @@ class ThreadContainer(threading.local): ...@@ -41,18 +40,16 @@ class ThreadContainer(threading.local):
self.answer = None self.answer = None
class ThreadedApplication(object): class ThreadedApplication(BaseApplication):
"""The client node application.""" """The client node application."""
def __init__(self, master_nodes, name, dynamic_master_list=None): def __init__(self, master_nodes, name, **kw):
# Start polling thread super(ThreadedApplication, self).__init__(**kw)
self.em = EventManager()
self.poll_thread = threading.Thread(target=self.run, name=name) self.poll_thread = threading.Thread(target=self.run, name=name)
self.poll_thread.daemon = True self.poll_thread.daemon = True
# Internal Attributes common to all thread # Internal Attributes common to all thread
self.name = name self.name = name
self.dispatcher = Dispatcher() self.dispatcher = Dispatcher()
self.nm = NodeManager(dynamic_master_list)
self.master_conn = None self.master_conn = None
# load master node list # load master node list
...@@ -65,11 +62,6 @@ class ThreadedApplication(object): ...@@ -65,11 +62,6 @@ class ThreadedApplication(object):
self._thread_container = ThreadContainer() self._thread_container = ThreadContainer()
app_set.add(self) # to register self.on_log app_set.add(self) # to register self.on_log
def __del__(self):
# Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections
self.close()
def close(self): def close(self):
# Clear all connection # Clear all connection
self.master_conn = None self.master_conn = None
...@@ -80,7 +72,7 @@ class ThreadedApplication(object): ...@@ -80,7 +72,7 @@ class ThreadedApplication(object):
logging.debug('Stopping %s', self.poll_thread) logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(True) self.em.wakeup(True)
else: else:
self.em.close() super(ThreadedApplication, self).close()
def start(self): def start(self):
self.poll_thread.is_alive() or self.poll_thread.start() self.poll_thread.is_alive() or self.poll_thread.start()
...@@ -90,7 +82,7 @@ class ThreadedApplication(object): ...@@ -90,7 +82,7 @@ class ThreadedApplication(object):
try: try:
self._run() self._run()
finally: finally:
self.em.close() super(ThreadedApplication, self).close()
logging.debug("Poll thread stopped") logging.debug("Poll thread stopped")
def _run(self): def _run(self):
......
...@@ -18,11 +18,10 @@ import sys, weakref ...@@ -18,11 +18,10 @@ import sys, weakref
from time import time from time import time
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
...@@ -38,7 +37,7 @@ from .transactions import TransactionManager ...@@ -38,7 +37,7 @@ from .transactions import TransactionManager
from .verification import VerificationManager from .verification import VerificationManager
class Application(object): class Application(BaseApplication):
"""The master node application.""" """The master node application."""
packing = None packing = None
# Latest completely commited TID # Latest completely commited TID
...@@ -48,9 +47,7 @@ class Application(object): ...@@ -48,9 +47,7 @@ class Application(object):
uuid = None uuid = None
def __init__(self, config): def __init__(self, config):
# Internal attributes. super(Application, self).__init__(config.getDynamicMasterList())
self.em = EventManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.tm = TransactionManager(self.onTransactionCommitted) self.tm = TransactionManager(self.onTransactionCommitted)
self.name = config.getCluster() self.name = config.getCluster()
...@@ -113,9 +110,7 @@ class Application(object): ...@@ -113,9 +110,7 @@ class Application(object):
self.listening_conn = None self.listening_conn = None
if self.backup_app is not None: if self.backup_app is not None:
self.backup_app.close() self.backup_app.close()
self.nm.close() super(Application, self).close()
self.em.close()
del self.__dict__
def log(self): def log(self):
self.em.log() self.em.log()
...@@ -387,8 +382,10 @@ class Application(object): ...@@ -387,8 +382,10 @@ class Application(object):
def runManager(self, manager_klass): def runManager(self, manager_klass):
self._current_manager = manager_klass(self) self._current_manager = manager_klass(self)
self._current_manager.run() try:
self._current_manager = None self._current_manager.run()
finally:
self._current_manager = None
def changeClusterState(self, state): def changeClusterState(self, state):
""" """
......
...@@ -27,7 +27,6 @@ class RecoveryManager(MasterHandler): ...@@ -27,7 +27,6 @@ class RecoveryManager(MasterHandler):
""" """
def __init__(self, app): def __init__(self, app):
super(RecoveryManager, self).__init__(app)
# The target node's uuid to request next. # The target node's uuid to request next.
self.target_ptid = None self.target_ptid = None
self.backup_tid_dict = {} self.backup_tid_dict = {}
......
...@@ -37,7 +37,6 @@ class VerificationManager(BaseServiceHandler): ...@@ -37,7 +37,6 @@ class VerificationManager(BaseServiceHandler):
""" """
def __init__(self, app): def __init__(self, app):
BaseServiceHandler.__init__(self, app)
self._oid_set = set() self._oid_set = set()
self._tid_set = set() self._tid_set = set()
self._uuid_set = set() self._uuid_set = set()
......
...@@ -14,32 +14,25 @@ ...@@ -14,32 +14,25 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.app import BaseApplication
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from neo.lib.node import NodeManager
from .handler import CommandEventHandler from .handler import CommandEventHandler
class NotReadyException(Exception): class NotReadyException(Exception):
pass pass
class NeoCTL(object): class NeoCTL(BaseApplication):
connection = None connection = None
connected = False connected = False
def __init__(self, address): def __init__(self, address):
self.nm = nm = NodeManager() super(NeoCTL, self).__init__()
self.server = nm.createAdmin(address=address) self.server = self.nm.createAdmin(address=address)
self.em = EventManager()
self.handler = CommandEventHandler(self) self.handler = CommandEventHandler(self)
self.response_queue = [] self.response_queue = []
def close(self):
self.em.close()
self.nm.close()
del self.__dict__
def __getConnection(self): def __getConnection(self):
if not self.connected: if not self.connected:
self.connection = ClientConnection(self.em, self.handler, self.connection = ClientConnection(self.em, self.handler,
......
...@@ -18,10 +18,10 @@ import sys ...@@ -18,10 +18,10 @@ import sys
from collections import deque from collections import deque
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.protocol import uuid_str, \ from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.node import NodeManager from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
from neo.lib.exception import OperationFailure, PrimaryFailure from neo.lib.exception import OperationFailure, PrimaryFailure
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
...@@ -37,16 +37,14 @@ from .transactions import TransactionManager ...@@ -37,16 +37,14 @@ from .transactions import TransactionManager
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
class Application(object): class Application(BaseApplication):
"""The storage node application.""" """The storage node application."""
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__(config.getDynamicMasterList())
# set the cluster name # set the cluster name
self.name = config.getCluster() self.name = config.getCluster()
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.tm = TransactionManager(self) self.tm = TransactionManager(self)
self.dm = buildDatabaseManager(config.getAdapter(), self.dm = buildDatabaseManager(config.getAdapter(),
(config.getDatabase(), config.getEngine(), config.getWait()), (config.getDatabase(), config.getEngine(), config.getWait()),
...@@ -89,13 +87,8 @@ class Application(object): ...@@ -89,13 +87,8 @@ class Application(object):
def close(self): def close(self):
self.listening_conn = None self.listening_conn = None
self.nm.close() self.dm.close()
self.em.close() super(Application, self).close()
try:
self.dm.close()
except AttributeError:
pass
del self.__dict__
def _poll(self): def _poll(self):
self.em.poll(1) self.em.poll(1)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import __builtin__ import __builtin__
import errno import errno
import functools import functools
import gc
import os import os
import random import random
import socket import socket
...@@ -146,6 +147,7 @@ class NeoTestBase(unittest.TestCase): ...@@ -146,6 +147,7 @@ class NeoTestBase(unittest.TestCase):
def tearDown(self): def tearDown(self):
assert self.tearDown.im_func is NeoTestBase.tearDown.im_func assert self.tearDown.im_func is NeoTestBase.tearDown.im_func
self._tearDown(sys._getframe(1).f_locals['success']) self._tearDown(sys._getframe(1).f_locals['success'])
assert not gc.garbage, gc.garbage
def _tearDown(self, success): def _tearDown(self, success):
# Kill all unfinished transactions for next test. # Kill all unfinished transactions for next test.
......
...@@ -332,18 +332,17 @@ class MasterApplication(ServerNode, neo.master.app.Application): ...@@ -332,18 +332,17 @@ class MasterApplication(ServerNode, neo.master.app.Application):
class StorageApplication(ServerNode, neo.storage.app.Application): class StorageApplication(ServerNode, neo.storage.app.Application):
dm = type('', (), {'close': lambda self: None})()
def resetNode(self, clear_database=False): def resetNode(self, clear_database=False):
self._init_args['getReset'] = clear_database self._init_args['getReset'] = clear_database
dm = self.dm
super(StorageApplication, self).resetNode() super(StorageApplication, self).resetNode()
if dm and not clear_database:
self.dm = dm
def _afterRun(self): def _afterRun(self):
super(StorageApplication, self)._afterRun() super(StorageApplication, self)._afterRun()
try: try:
self.dm.close() self.dm.close()
self.dm = None del self.dm
except StandardError: # AttributeError & ProgrammingError except StandardError: # AttributeError & ProgrammingError
pass pass
...@@ -706,7 +705,10 @@ class NEOCluster(object): ...@@ -706,7 +705,10 @@ class NEOCluster(object):
node_list = self.admin_list + self.storage_list + self.master_list node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list: for node in node_list:
node.em.wakeup(True) node.em.wakeup(True)
client is None or node_list.append(client.poll_thread) try:
node_list.append(client.poll_thread)
except AttributeError: # client is None or thread is already stopped
pass
self.join(node_list) self.join(node_list)
logging.debug("stopped %s", self) logging.debug("stopped %s", self)
self._unpatch() self._unpatch()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment