pool.py 6.34 KB
Newer Older
1
#
2
# Copyright (C) 2006-2015  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17 18
import time
from random import shuffle
19

20
from neo.lib import logging
21
from neo.lib.locking import Lock
22 23
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.connection import MTClientConnection, ConnectionClosed
24
from neo.lib.exception import NodeNotReady
25
from .exception import NEOStorageError
26 27 28 29 30 31 32 33 34 35 36 37

# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600

# Cell list sort keys
#   We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1
#   normal priority
CELL_GOOD = 0
#   Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
38 39 40 41 42 43 44 45 46 47 48

class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""

    def __init__(self, app, max_pool_size = 25):
        self.app = app
        self.max_pool_size = max_pool_size
        self.connection_dict = {}
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
49
        self._lock = Lock()
50
        self.node_failure_dict = {}
51 52 53 54

    def _initNodeConnection(self, node):
        """Init a connection to a given storage node."""
        app = self.app
55
        logging.debug('trying to connect to %s - %s', node, node.getState())
56
        conn = MTClientConnection(app.em, app.storage_event_handler, node,
57
            connector=app.connector_handler(), dispatcher=app.dispatcher)
58 59
        p = Packets.RequestIdentification(NodeTypes.CLIENT,
            app.uuid, None, app.name)
60
        try:
61
            app._ask(conn, p, handler=app.storage_bootstrap_handler)
62
        except ConnectionClosed:
63
            logging.error('Connection to %r failed', node)
64
        except NodeNotReady:
65
            logging.info('%r not ready', node)
66
        else:
67
            logging.info('Connected %r', node)
68 69
            return conn
        self.notifyFailure(node)
70 71 72

    def _dropConnections(self):
        """Drop connections."""
73
        for conn in self.connection_dict.values():
74 75 76 77 78 79 80
            # Drop first connection which looks not used
            conn.lock()
            try:
                if not conn.pending() and \
                        not self.app.dispatcher.registered(conn):
                    del self.connection_dict[conn.getUUID()]
                    conn.close()
81 82
                    logging.debug('_dropConnections: connection to '
                        'storage node %s:%d closed', *conn.getAddress())
83 84 85 86 87
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
            finally:
                conn.unlock()

88
    def notifyFailure(self, node):
89
        self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
90 91

    def getCellSortKey(self, cell):
92
        uuid = cell.getUUID()
93
        if uuid in self.connection_dict:
94 95 96 97 98
            return CELL_CONNECTED
        failure = self.node_failure_dict.get(uuid)
        if failure is None or failure < time.time():
            return CELL_GOOD
        return CELL_FAILED
99

100 101
    def getConnForCell(self, cell):
        return self.getConnForNode(cell.getNode())
102

103
    def iterateForObject(self, object_id, readable=False):
104
        """ Iterate over nodes managing an object """
105
        pt = self.app.pt
106 107
        if type(object_id) is str:
            object_id = pt.getPartition(object_id)
108
        cell_list = pt.getCellList(object_id, readable)
109 110 111
        if not cell_list:
            raise NEOStorageError('no storage available')
        getConnForNode = self.getConnForNode
Julien Muchembled's avatar
Julien Muchembled committed
112
        while 1:
113
            new_cell_list = []
114
            # Shuffle to randomise node to access...
115
            shuffle(cell_list)
116 117
            # ...and sort with non-unique keys, to prioritise ranges of
            # randomised entries.
118 119 120
            cell_list.sort(key=self.getCellSortKey)
            for cell in cell_list:
                node = cell.getNode()
Julien Muchembled's avatar
Julien Muchembled committed
121 122 123 124 125 126 127
                conn = getConnForNode(node)
                if conn is not None:
                    yield node, conn
                # Re-check if node is running, as our knowledge of its
                # state can have changed during connection attempt.
                elif node.isRunning():
                    new_cell_list.append(cell)
128
            if not new_cell_list or self.app.master_conn is None:
Julien Muchembled's avatar
Julien Muchembled committed
129
                break
130
            cell_list = new_cell_list
Julien Muchembled's avatar
Julien Muchembled committed
131 132
            # wait a bit to avoid a busy loop
            time.sleep(1)
133

134
    def getConnForNode(self, node):
135 136
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
137 138
        if node.isRunning():
            uuid = node.getUUID()
139 140
            try:
                # Already connected to node
Grégory Wisniewski's avatar
Grégory Wisniewski committed
141
                return self.connection_dict[uuid]
142
            except KeyError:
143
                with self._lock:
144 145 146 147 148 149 150 151 152 153 154 155
                    # Second lookup, if another thread initiated connection
                    # while we were waiting for connection lock.
                    try:
                        return self.connection_dict[uuid]
                    except KeyError:
                        if len(self.connection_dict) > self.max_pool_size:
                            # must drop some unused connections
                            self._dropConnections()
                        # Create new connection to node
                        conn = self._initNodeConnection(node)
                        if conn is not None:
                            self.connection_dict[uuid] = conn
156
                            return conn
157 158 159

    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
160
        self.connection_dict.pop(node.getUUID(), None)
161

162 163 164 165
    def flush(self):
        """Remove all connections"""
        self.connection_dict.clear()