app.py 11.9 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2019  Nexedi SA
Aurel's avatar
Aurel committed
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
import sys
18
from collections import deque
19

20
from neo.lib import logging
21
from neo.lib.app import BaseApplication, buildOptionParser
22
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets
23
from neo.lib.connection import ListeningConnection
24
from neo.lib.exception import StoppedOperation, PrimaryFailure
25 26 27
from neo.lib.pt import PartitionTable
from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
28
from .checker import Checker
29
from .database import buildDatabaseManager, DATABASE_MANAGER_DICT
30
from .handlers import identification, initialization, master
31 32
from .replicator import Replicator
from .transactions import TransactionManager
33

34
from neo.lib.debug import register as registerLiveDebugger
35

36 37 38 39 40 41 42
option_defaults = {
  'adapter': 'MySQL',
  'wait': 0,
}
assert option_defaults['adapter'] in DATABASE_MANAGER_DICT

@buildOptionParser
43
class Application(BaseApplication):
44 45
    """The storage node application."""

46
    checker = replicator = tm = None
47

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    @classmethod
    def _buildOptionParser(cls):
        parser = cls.option_parser
        parser.description = "NEO Storage node"
        cls.addCommonServerOptions('storage', '127.0.0.1')

        _ = parser.group('storage')
        _('a', 'adapter', choices=sorted(DATABASE_MANAGER_DICT),
            help="database adapter to use")
        _('d', 'database', required=True,
            help="database connections string")
        _.float('w', 'wait',
            help="seconds to wait for backend to be available,"
                 " before erroring-out (-1 = infinite)")
        _.bool('disable-drop-partitions',
            help="do not delete data of discarded cells, which is useful for"
                 " big databases because the current implementation is"
                 " inefficient (this option should disappear in the future)")
66 67 68 69 70
        _.bool('new-nid',
            help="request a new NID from a cluster that is already"
                 " operational, update the database with the new NID and exit,"
                 " which makes easier to quickly set up a replica by copying"
                 " the database of another node while it was stopped")
71 72

        _ = parser.group('database creation')
73 74 75
        _.int('i', 'nid',
            help="specify an NID to use for this process. Previously"
                 " assigned NID takes precedence (i.e. you should"
76 77 78 79 80 81 82 83 84 85 86 87 88 89
                 " always use reset with this switch)")
        _('e', 'engine', help="database engine (MySQL only)")
        _.bool('dedup',
            help="enable deduplication of data"
                 " when setting up a new storage node")
        # TODO: Forbid using "reset" along with any unneeded argument.
        #       "reset" is too dangerous to let user a chance of accidentally
        #       letting it slip through in a long option list.
        #       It should even be forbidden in configuration files.
        _.bool('reset',
            help="remove an existing database if any, and exit")

        parser.set_defaults(**option_defaults)

90
    def __init__(self, config):
Julien Muchembled's avatar
Julien Muchembled committed
91
        super(Application, self).__init__(
92
            config.get('ssl'), config.get('dynamic_master_list'))
93
        # set the cluster name
94
        self.name = config['cluster']
95

96 97
        self.dm = buildDatabaseManager(config['adapter'],
            (config['database'], config.get('engine'), config['wait']),
98
        )
99 100
        self.disable_drop_partitions = config.get('disable_drop_partitions',
                                                  False)
101

102
        # load master nodes
103
        for master_address in config['masters']:
Olivier Cros's avatar
Olivier Cros committed
104
            self.nm.createMaster(address=master_address)
105

106
        # set the bind address
107
        self.server = config['bind']
108
        logging.debug('IP address is %s, port is %d', *self.server)
109

Yoshinori Okuji's avatar
Yoshinori Okuji committed
110 111 112
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
113

114
        self.listening_conn = None
115
        self.master_conn = None
116 117 118 119
        self.master_node = None

        # operation related data
        self.operational = False
120

121 122
        self.dm.setup(reset=config.get('reset', False),
                      dedup=config.get('dedup', False))
123
        self.loadConfiguration()
124
        self.devpath = self.dm.getTopologyPath()
125

126 127 128 129 130 131 132 133 134 135
        if config.get('new_nid'):
            self.new_nid = [x[0] for x in self.dm.iterAssignedCells()]
            if not self.new_nid:
                sys.exit('database is empty')
            self.uuid = None
        else:
            self.new_nid = ()
            if 'nid' in config: # for testing purpose only
                self.uuid = config['nid']
                logging.node(self.name, self.uuid)
136

137
        registerLiveDebugger(on_log=self.log)
138

139 140
    def close(self):
        self.listening_conn = None
141 142
        self.dm.close()
        super(Application, self).close()
143

144 145 146
    def _poll(self):
        self.em.poll(1)

147 148 149
    def log(self):
        self.em.log()
        self.nm.log()
150 151
        if self.tm:
            self.tm.log()
152 153 154
        if self.pt is not None:
            self.pt.log()

155 156 157
    def loadConfiguration(self):
        """Load persistent configuration data from the database.
        If data is not present, generate it."""
158

159
        dm = self.dm
160

161
        # check cluster name
162 163
        name = dm.getName()
        if name is None:
164
            dm.setName(self.name)
165 166
        elif name != self.name:
            raise RuntimeError('name %r does not match with the database: %r'
167
                               % (self.name, name))
168 169

        # load configuration
170
        self.uuid = dm.getUUID()
171
        logging.node(self.name, self.uuid)
Aurel's avatar
Aurel committed
172

173
        logging.info('Configuration loaded:')
174
        logging.info('PTID      : %s', dump(dm.getPTID()))
175
        logging.info('Name      : %s', self.name)
176 177 178

    def loadPartitionTable(self):
        """Load a partition table from the database."""
179
        ptid = self.dm.getPTID()
180
        if ptid is None:
181
            self.pt = PartitionTable(0, 0)
182
            return
183
        row_list = []
184
        for offset, uuid, state in self.dm.getPartitionTable():
185 186
            while len(row_list) <= offset:
                row_list.append([])
187
            # register unknown nodes
188
            if self.nm.getByUUID(uuid) is None:
189
                self.nm.createStorage(uuid=uuid)
190 191 192
            row_list[offset].append((uuid, CellStates[state]))
        self.pt = object.__new__(PartitionTable)
        self.pt.load(ptid, self.dm.getNumReplicas(), row_list, self.nm)
193 194

    def run(self):
195 196
        try:
            self._run()
197
        except Exception:
198
            logging.exception('Pre-mortem data:')
199
            self.log()
200
            logging.flush()
201 202 203
            raise

    def _run(self):
204 205 206 207
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

208
        # Make a listening port
209
        handler = identification.IdentificationHandler(self)
210
        self.listening_conn = ListeningConnection(self, handler, self.server)
211
        self.server = self.listening_conn.getAddress()
212

213
        # Connect to a primary master node, verify data, and
Vincent Pelletier's avatar
Vincent Pelletier committed
214
        # start the operation. This cycle will be executed permanently,
215
        # until the user explicitly requests a shutdown.
216
        self.operational = False
217
        while True:
Julien Muchembled's avatar
Julien Muchembled committed
218
            self.cluster_state = None
219 220 221
            if self.master_node is None:
                # look for the primary master
                self.connectToPrimary()
222 223
            self.checker = Checker(self)
            self.replicator = Replicator(self)
224
            self.tm = TransactionManager(self)
225
            try:
226 227 228
                self.initialize()
                self.doOperation()
                raise RuntimeError, 'should not reach here'
229
            except StoppedOperation, msg:
230
                logging.error('operation stopped: %s', msg)
231
            except PrimaryFailure, msg:
232
                logging.error('primary master is down: %s', msg)
233
            finally:
234
                self.operational = False
235 236 237 238 239 240 241 242
            # When not ready, we reject any incoming connection so for
            # consistency, we also close any connection except that to the
            # master. This includes connections to other storage nodes and any
            # replication is aborted, whether we are feeding or out-of-date.
            for conn in self.em.getConnectionList():
                if conn not in (self.listening_conn, self.master_conn):
                    conn.close()
            del self.checker, self.replicator, self.tm
243

244
    def connectToPrimary(self):
245 246 247 248
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
249

250 251
        Note that I do not accept any connection from non-master nodes
        at this stage."""
252
        # search, find, connect and identify to the primary master
253 254 255
        bootstrap = BootstrapManager(self, NodeTypes.STORAGE,
                                     None if self.new_nid else self.server,
                                     self.devpath, self.new_nid)
256
        self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
257
        self.dm.setUUID(self.uuid)
258

259 260 261
        # Reload a partition table from the database,
        # in case that we're in RECOVERING phase.
        self.loadPartitionTable()
262

263
    def initialize(self):
264
        logging.debug('initializing...')
265
        _poll = self._poll
266 267
        self.master_conn.setHandler(initialization.InitializationHandler(self))
        while not self.operational:
268
            _poll()
269
        self.master_conn.send(Packets.NotifyReady())
270

271 272
    def doOperation(self):
        """Handle everything, including replications and transactions."""
273
        logging.info('doing operation')
274

275 276
        poll = self._poll
        _poll = self.em._poll
277
        isIdle = self.em.isIdle
278

279 280
        self.master_conn.setHandler(master.MasterOperationHandler(self))
        self.replicator.populate()
281

282 283
        # Forget all unfinished data.
        self.dm.dropUnfinishedData()
284

285 286
        self.task_queue = task_queue = deque()
        try:
287
            self.dm.doOperation(self)
288
            while True:
289
                while task_queue:
290
                    try:
291
                        while isIdle():
292 293
                            next(task_queue[-1]) or task_queue.rotate()
                            _poll(0)
294
                        break
295 296
                    except StopIteration:
                        task_queue.pop()
297
                poll()
298 299
        finally:
            del self.task_queue
300

301
    def changeClusterState(self, state):
Julien Muchembled's avatar
Julien Muchembled committed
302
        self.cluster_state = state
303
        if state == ClusterStates.STOPPING_BACKUP:
Julien Muchembled's avatar
Julien Muchembled committed
304
            self.replicator.stop()
305

306 307 308
    def newTask(self, iterator):
        self.task_queue.appendleft(iterator)

309 310 311 312 313
    def closeClient(self, connection):
        if connection is not self.replicator.getCurrentConnection() and \
           connection not in self.checker.conn_dict:
            connection.closeClient()

314
    def shutdown(self, erase=False):
315 316
        """Close all connections and exit"""
        for c in self.em.getConnectionList():
317
            try:
318
                c.close()
319 320
            except PrimaryFailure:
                pass
321
        # clear database to avoid polluting the cluster at restart
322 323
        if erase:
            self.dm.erase()
324
        logging.info("Application has been asked to shut down")
325
        sys.exit()