app.py 12.3 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2006-2017  Nexedi SA
Aurel's avatar
Aurel committed
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
import sys
18
from collections import deque
19

20
from neo.lib import logging
21
from neo.lib.app import BaseApplication, buildOptionParser
22 23
from neo.lib.protocol import uuid_str, \
    CellStates, ClusterStates, NodeTypes, Packets
24
from neo.lib.connection import ListeningConnection
25
from neo.lib.exception import StoppedOperation, PrimaryFailure
26 27 28
from neo.lib.pt import PartitionTable
from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
29
from .checker import Checker
30
from .database import buildDatabaseManager, DATABASE_MANAGER_DICT
31
from .handlers import identification, initialization, master
32 33
from .replicator import Replicator
from .transactions import TransactionManager
34

35
from neo.lib.debug import register as registerLiveDebugger
36

37 38 39 40 41 42 43
option_defaults = {
  'adapter': 'MySQL',
  'wait': 0,
}
assert option_defaults['adapter'] in DATABASE_MANAGER_DICT

@buildOptionParser
44
class Application(BaseApplication):
45 46
    """The storage node application."""

47
    checker = replicator = tm = None
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    @classmethod
    def _buildOptionParser(cls):
        parser = cls.option_parser
        parser.description = "NEO Storage node"
        cls.addCommonServerOptions('storage', '127.0.0.1')

        _ = parser.group('storage')
        _('a', 'adapter', choices=sorted(DATABASE_MANAGER_DICT),
            help="database adapter to use")
        _('d', 'database', required=True,
            help="database connections string")
        _.float('w', 'wait',
            help="seconds to wait for backend to be available,"
                 " before erroring-out (-1 = infinite)")
        _.bool('disable-drop-partitions',
            help="do not delete data of discarded cells, which is useful for"
                 " big databases because the current implementation is"
                 " inefficient (this option should disappear in the future)")

        _ = parser.group('database creation')
        _.int('u', 'uuid',
            help="specify an UUID to use for this process. Previously"
                 " assigned UUID takes precedence (i.e. you should"
                 " always use reset with this switch)")
        _('e', 'engine', help="database engine (MySQL only)")
        _.bool('dedup',
            help="enable deduplication of data"
                 " when setting up a new storage node")
        # TODO: Forbid using "reset" along with any unneeded argument.
        #       "reset" is too dangerous to let user a chance of accidentally
        #       letting it slip through in a long option list.
        #       It should even be forbidden in configuration files.
        _.bool('reset',
            help="remove an existing database if any, and exit")

        parser.set_defaults(**option_defaults)

86
    def __init__(self, config):
Julien Muchembled's avatar
Julien Muchembled committed
87
        super(Application, self).__init__(
88
            config.get('ssl'), config.get('dynamic_master_list'))
89
        # set the cluster name
90
        self.name = config['cluster']
91

92 93
        self.dm = buildDatabaseManager(config['adapter'],
            (config['database'], config.get('engine'), config['wait']),
94
        )
95 96
        self.disable_drop_partitions = config.get('disable_drop_partitions',
                                                  False)
97

98
        # load master nodes
99
        for master_address in config['masters']:
Olivier Cros's avatar
Olivier Cros committed
100
            self.nm.createMaster(address=master_address)
101

102
        # set the bind address
103
        self.server = config['bind']
104
        logging.debug('IP address is %s, port is %d', *self.server)
105

Yoshinori Okuji's avatar
Yoshinori Okuji committed
106 107 108
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
109

110
        self.listening_conn = None
111
        self.master_conn = None
112 113 114 115
        self.master_node = None

        # operation related data
        self.operational = False
116

117 118
        self.dm.setup(reset=config.get('reset', False),
                      dedup=config.get('dedup', False))
119
        self.loadConfiguration()
120
        self.devpath = self.dm.getTopologyPath()
121

122
        # force node uuid from command line argument, for testing purpose only
123 124
        if 'uuid' in config:
            self.uuid = config['uuid']
125

126
        registerLiveDebugger(on_log=self.log)
127

128 129
    def close(self):
        self.listening_conn = None
130 131
        self.dm.close()
        super(Application, self).close()
132

133 134 135
    def _poll(self):
        self.em.poll(1)

136 137 138
    def log(self):
        self.em.log()
        self.nm.log()
139 140
        if self.tm:
            self.tm.log()
141 142 143
        if self.pt is not None:
            self.pt.log()

144 145 146
    def loadConfiguration(self):
        """Load persistent configuration data from the database.
        If data is not present, generate it."""
147

148
        dm = self.dm
149

150
        # check cluster name
151 152
        name = dm.getName()
        if name is None:
153
            dm.setName(self.name)
154 155
        elif name != self.name:
            raise RuntimeError('name %r does not match with the database: %r'
156
                               % (self.name, name))
157 158

        # load configuration
159 160 161 162
        self.uuid = dm.getUUID()
        num_partitions = dm.getNumPartitions()
        num_replicas = dm.getNumReplicas()
        ptid = dm.getPTID()
163

164
        # check partition table configuration
165 166 167
        if num_partitions is not None and num_replicas is not None:
            if num_partitions <= 0:
                raise RuntimeError, 'partitions must be more than zero'
168
            # create a partition table
169
            self.pt = PartitionTable(num_partitions, num_replicas)
Aurel's avatar
Aurel committed
170

171
        logging.info('Configuration loaded:')
172
        logging.info('UUID      : %s', uuid_str(self.uuid))
173 174 175 176
        logging.info('PTID      : %s', dump(ptid))
        logging.info('Name      : %s', self.name)
        logging.info('Partitions: %s', num_partitions)
        logging.info('Replicas  : %s', num_replicas)
177 178 179

    def loadPartitionTable(self):
        """Load a partition table from the database."""
180
        self.pt.clear()
181
        ptid = self.dm.getPTID()
182 183 184 185
        if ptid is None:
            return
        cell_list = []
        for offset, uuid, state in self.dm.getPartitionTable():
186
            # register unknown nodes
187
            if self.nm.getByUUID(uuid) is None:
188
                self.nm.createStorage(uuid=uuid)
189 190
            cell_list.append((offset, uuid, CellStates[state]))
        self.pt.update(ptid, cell_list, self.nm)
191 192

    def run(self):
193 194
        try:
            self._run()
195
        except Exception:
196
            logging.exception('Pre-mortem data:')
197
            self.log()
198
            logging.flush()
199 200 201
            raise

    def _run(self):
202 203 204 205
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

206
        # Make a listening port
207
        handler = identification.IdentificationHandler(self)
208
        self.listening_conn = ListeningConnection(self, handler, self.server)
209
        self.server = self.listening_conn.getAddress()
210

211
        # Connect to a primary master node, verify data, and
Vincent Pelletier's avatar
Vincent Pelletier committed
212
        # start the operation. This cycle will be executed permanently,
213
        # until the user explicitly requests a shutdown.
214
        self.operational = False
215
        while True:
Julien Muchembled's avatar
Julien Muchembled committed
216
            self.cluster_state = None
217 218 219
            if self.master_node is None:
                # look for the primary master
                self.connectToPrimary()
220 221
            self.checker = Checker(self)
            self.replicator = Replicator(self)
222
            self.tm = TransactionManager(self)
223
            try:
224 225 226
                self.initialize()
                self.doOperation()
                raise RuntimeError, 'should not reach here'
227
            except StoppedOperation, msg:
228
                logging.error('operation stopped: %s', msg)
229
            except PrimaryFailure, msg:
230
                logging.error('primary master is down: %s', msg)
231
            finally:
232
                self.operational = False
233 234 235 236 237 238 239 240
            # When not ready, we reject any incoming connection so for
            # consistency, we also close any connection except that to the
            # master. This includes connections to other storage nodes and any
            # replication is aborted, whether we are feeding or out-of-date.
            for conn in self.em.getConnectionList():
                if conn not in (self.listening_conn, self.master_conn):
                    conn.close()
            del self.checker, self.replicator, self.tm
241

242
    def connectToPrimary(self):
243 244 245 246
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
247

248 249
        Note that I do not accept any connection from non-master nodes
        at this stage."""
250 251 252
        pt = self.pt

        # search, find, connect and identify to the primary master
253 254
        bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
                                     self.devpath)
255 256 257
        self.master_node, self.master_conn, num_partitions, num_replicas = \
            bootstrap.getPrimaryConnection()
        uuid = self.uuid
258
        logging.info('I am %s', uuid_str(uuid))
259
        self.dm.setUUID(uuid)
260

261 262 263
        # Reload a partition table from the database. This is necessary
        # when a previous primary master died while sending a partition
        # table, because the table might be incomplete.
264 265 266 267 268 269 270
        if pt is not None:
            self.loadPartitionTable()
            if num_partitions != pt.getPartitions():
                raise RuntimeError('the number of partitions is inconsistent')

        if pt is None or pt.getReplicas() != num_replicas:
            # changing number of replicas is not an issue
271 272
            self.dm.setNumPartitions(num_partitions)
            self.dm.setNumReplicas(num_replicas)
273
            self.pt = PartitionTable(num_partitions, num_replicas)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
274
            self.loadPartitionTable()
275

276
    def initialize(self):
277
        logging.debug('initializing...')
278
        _poll = self._poll
279 280
        self.master_conn.setHandler(initialization.InitializationHandler(self))
        while not self.operational:
281
            _poll()
282
        self.master_conn.send(Packets.NotifyReady())
283

284 285
    def doOperation(self):
        """Handle everything, including replications and transactions."""
286
        logging.info('doing operation')
287

288 289
        poll = self._poll
        _poll = self.em._poll
290
        isIdle = self.em.isIdle
291

292 293
        self.master_conn.setHandler(master.MasterOperationHandler(self))
        self.replicator.populate()
294

295 296
        # Forget all unfinished data.
        self.dm.dropUnfinishedData()
297

298 299
        self.task_queue = task_queue = deque()
        try:
300
            self.dm.doOperation(self)
301
            while True:
302
                while task_queue:
303
                    try:
304
                        while isIdle():
305 306
                            next(task_queue[-1]) or task_queue.rotate()
                            _poll(0)
307
                        break
308 309
                    except StopIteration:
                        task_queue.pop()
310
                poll()
311 312
        finally:
            del self.task_queue
313

314
    def changeClusterState(self, state):
Julien Muchembled's avatar
Julien Muchembled committed
315
        self.cluster_state = state
316
        if state == ClusterStates.STOPPING_BACKUP:
Julien Muchembled's avatar
Julien Muchembled committed
317
            self.replicator.stop()
318

319 320 321
    def newTask(self, iterator):
        self.task_queue.appendleft(iterator)

322 323 324 325 326
    def closeClient(self, connection):
        if connection is not self.replicator.getCurrentConnection() and \
           connection not in self.checker.conn_dict:
            connection.closeClient()

327
    def shutdown(self, erase=False):
328 329
        """Close all connections and exit"""
        for c in self.em.getConnectionList():
330
            try:
331
                c.close()
332 333
            except PrimaryFailure:
                pass
334
        # clear database to avoid polluting the cluster at restart
335 336
        if erase:
            self.dm.erase()
337
        logging.info("Application has been asked to shut down")
338
        sys.exit()