app.py 21.6 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2012  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
import os, sys
Grégory Wisniewski's avatar
Grégory Wisniewski committed
18
from time import time
Yoshinori Okuji's avatar
Yoshinori Okuji committed
19

20
from neo.lib import logging
21 22
from neo.lib.connector import getConnectorHandler
from neo.lib.debug import register as registerLiveDebugger
23
from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID, NotReadyError
24 25 26 27 28 29
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.lib.util import dump
30 31 32 33

class StateChangedException(Exception): pass

from .backup_app import BackupApplication
34 35 36 37 38 39
from .handlers import election, identification, secondary
from .handlers import administration, client, storage, shutdown
from .pt import PartitionTable
from .recovery import RecoveryManager
from .transactions import TransactionManager
from .verification import VerificationManager
Yoshinori Okuji's avatar
Yoshinori Okuji committed
40

41

42
class Application(object):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
43
    """The master node application."""
44
    packing = None
45 46
    # Latest completely commited TID
    last_transaction = ZERO_TID
47 48
    backup_tid = None
    backup_app = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
49

50
    def __init__(self, config):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
51
        # Internal attributes.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52
        self.em = EventManager()
53
        self.nm = NodeManager(config.getDynamicMasterList())
54
        self.tm = TransactionManager(self.onTransactionCommitted)
55

56 57 58
        self.name = config.getCluster()
        self.server = config.getBind()

59
        self.storage_readiness = set()
Olivier Cros's avatar
Olivier Cros committed
60 61
        master_addresses, connector_name = config.getMasters()
        self.connector_handler = getConnectorHandler(connector_name)
Julien Muchembled's avatar
Julien Muchembled committed
62
        for master_address in master_addresses:
Olivier Cros's avatar
Olivier Cros committed
63
            self.nm.createMaster(address=master_address)
64

65
        logging.debug('IP address is %s, port is %d', *self.server)
66

67
        # Partition table
68
        replicas, partitions = config.getReplicas(), config.getPartitions()
69 70 71 72 73
        if replicas < 0:
            raise RuntimeError, 'replicas must be a positive integer'
        if partitions <= 0:
            raise RuntimeError, 'partitions must be more than zero'
        self.pt = PartitionTable(partitions, replicas)
74 75 76 77
        logging.info('Configuration:')
        logging.info('Partitions: %d', partitions)
        logging.info('Replicas  : %d', replicas)
        logging.info('Name      : %s', self.name)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
78

79
        self.listening_conn = None
80 81
        self.primary = None
        self.primary_master_node = None
82
        self.cluster_state = None
83
        self._startup_allowed = False
84

85
        # Generate an UUID for self
86
        uuid = config.getUUID()
87
        if uuid is None or uuid == '':
88
            uuid = self.getNewUUID(NodeTypes.MASTER)
89
        self.uuid = uuid
90
        logging.info('UUID      : %s', dump(uuid))
91

92 93 94 95
        # election related data
        self.unconnected_master_node_set = set()
        self.negotiating_master_node_set = set()

96
        self._current_manager = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
97

98 99 100 101 102 103 104 105 106
        # backup
        upstream_cluster = config.getUpstreamCluster()
        if upstream_cluster:
            if upstream_cluster == self.name:
                raise ValueError("upstream cluster name must be"
                                 " different from cluster name")
            self.backup_app = BackupApplication(self, upstream_cluster,
                                                *config.getUpstreamMasters())

107
        registerLiveDebugger(on_log=self.log)
108

109 110
    def close(self):
        self.listening_conn = None
111 112
        if self.backup_app is not None:
            self.backup_app.close()
113 114 115 116
        self.nm.close()
        self.em.close()
        del self.__dict__

117 118
    def log(self):
        self.em.log()
119 120
        if self.backup_app is not None:
            self.backup_app.log()
121 122 123 124 125
        self.nm.log()
        self.tm.log()
        if self.pt is not None:
            self.pt.log()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
126
    def run(self):
127 128 129
        try:
            self._run()
        except:
130
            logging.exception('Pre-mortem data:')
131
            self.log()
132 133 134
            raise

    def _run(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
135 136
        """Make sure that the status is sane and start a loop."""
        # Make a listening port.
137
        self.listening_conn = ListeningConnection(self.em, None,
138
            addr=self.server, connector=self.connector_handler())
139

140
        # Start a normal operation.
141
        while True:
142
            # (Re)elect a new primary master.
Julien Muchembled's avatar
Julien Muchembled committed
143
            self.primary = not self.nm.getMasterList()
144
            if not self.primary:
145
                self.electPrimary()
146 147
            try:
                if self.primary:
148
                    self.playPrimaryRole()
149 150
                else:
                    self.playSecondaryRole()
151
                raise RuntimeError, 'should not reach here'
152
            except (ElectionFailure, PrimaryFailure):
153
                # Forget all connections.
154 155
                for conn in self.em.getClientList():
                    conn.close()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
156

157

158
    def electPrimary(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
159
        """Elect a primary master node.
Aurel's avatar
Aurel committed
160

Yoshinori Okuji's avatar
Yoshinori Okuji committed
161 162 163 164
        The difficulty is that a master node must accept connections from
        others while attempting to connect to other master nodes at the
        same time. Note that storage nodes and client nodes may connect
        to self as well as master nodes."""
165
        logging.info('begin the election of a primary master')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
166

167
        client_handler = election.ClientElectionHandler(self)
168 169
        self.unconnected_master_node_set.clear()
        self.negotiating_master_node_set.clear()
170
        self.listening_conn.setHandler(election.ServerElectionHandler(self))
171
        getByAddress = self.nm.getByAddress
172

173
        while True:
174

175 176
            # handle new connected masters
            for node in self.nm.getMasterList():
177 178
                node.setUnknown()
                self.unconnected_master_node_set.add(node.getAddress())
179

180 181 182 183
            # start the election process
            self.primary = None
            self.primary_master_node = None
            try:
184 185
                while (self.unconnected_master_node_set or
                        self.negotiating_master_node_set):
186
                    for addr in self.unconnected_master_node_set:
187 188 189 190
                        ClientConnection(self.em, client_handler,
                            # XXX: Ugly, but the whole election code will be
                            # replaced soon
                            node=getByAddress(addr),
191 192 193
                            connector=self.connector_handler())
                        self.negotiating_master_node_set.add(addr)
                    self.unconnected_master_node_set.clear()
194
                    self.em.poll(1)
195 196
            except ElectionFailure, m:
                # something goes wrong, clean then restart
197
                logging.error('election failed: %s', m)
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

                # Ask all connected nodes to reelect a single primary master.
                for conn in self.em.getClientList():
                    conn.notify(Packets.ReelectPrimary())
                    conn.abort()

                # Wait until the connections are closed.
                self.primary = None
                self.primary_master_node = None
                t = time() + 10
                while self.em.getClientList() and time() < t:
                    try:
                        self.em.poll(1)
                    except ElectionFailure:
                        pass

                # Close all connections.
                for conn in self.em.getClientList() + self.em.getServerList():
                    conn.close()
217 218
            else:
                # election succeed, stop the process
219 220 221
                self.primary = self.primary is None
                break

222 223 224 225 226 227 228 229
    def broadcastNodesInformation(self, node_list):
        """
          Broadcast changes for a set a nodes
          Send only one packet per connection to reduce bandwidth
        """
        node_dict = {}
        # group modified nodes by destination node type
        for node in node_list:
230
            node_info = node.asTuple()
231 232 233 234 235 236 237 238 239 240 241
            def assign_for_notification(node_type):
                # helper function
                node_dict.setdefault(node_type, []).append(node_info)
            if node.isMaster() or node.isStorage():
                # client get notifications for master and storage only
                assign_for_notification(NodeTypes.CLIENT)
            if node.isMaster() or node.isStorage() or node.isClient():
                assign_for_notification(NodeTypes.STORAGE)
                assign_for_notification(NodeTypes.ADMIN)

        # send at most one non-empty notification packet per node
242
        for node in self.nm.getIdentifiedList():
243
            node_list = node_dict.get(node.getType(), [])
244
            if node_list and node.isRunning():
245
                node.notify(Packets.NotifyNodeInformation(node_list))
246

247
    def broadcastPartitionChanges(self, cell_list, selector=None):
248
        """Broadcast a Notify Partition Changes packet."""
249
        logging.debug('broadcastPartitionChanges')
250
        if not cell_list:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
251
            return
252 253
        if not selector:
            selector = lambda n: n.isClient() or n.isStorage() or n.isAdmin()
254
        self.pt.log()
255 256
        ptid = self.pt.setNextID()
        packet = Packets.NotifyPartitionChanges(ptid, cell_list)
257
        for node in self.nm.getIdentifiedList():
258 259
            if not node.isRunning():
                continue
260
            if selector(node):
261
                node.notify(packet)
262

263 264
    def broadcastLastOID(self):
        oid = self.tm.getLastOID()
265
        logging.debug('Broadcast last OID to storages : %s', dump(oid))
266
        packet = Packets.NotifyLastOID(oid)
267 268
        for node in self.nm.getStorageList(only_identified=True):
            node.notify(packet)
269

Yoshinori Okuji's avatar
Yoshinori Okuji committed
270
    def provideService(self):
271 272
        """
        This is the normal mode for a primary master node. Handle transactions
Yoshinori Okuji's avatar
Yoshinori Okuji committed
273
        and stop the service only if a catastrophy happens or the user commits
274 275
        a shutdown.
        """
276
        logging.info('provide service')
277
        poll = self.em.poll
278
        self.tm.reset()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
279

280
        self.changeClusterState(ClusterStates.RUNNING)
281

Yoshinori Okuji's avatar
Yoshinori Okuji committed
282
        # Now everything is passive.
283 284 285 286 287 288
        try:
            while True:
                poll(1)
        except OperationFailure:
            # If not operational, send Stop Operation packets to storage
            # nodes and client nodes. Abort connections to client nodes.
289
            logging.critical('No longer operational')
290 291 292 293 294
        except StateChangedException, e:
            assert e.args[0] == ClusterStates.STARTING_BACKUP
            self.backup_tid = tid = self.getLastTransaction()
            self.pt.setBackupTidDict(dict((node.getUUID(), tid)
                for node in self.nm.getStorageList(only_identified=True)))
295

296
    def playPrimaryRole(self):
297
        logging.info('play the primary role with %r', self.listening_conn)
298
        em = self.em
299
        packet = Packets.AnnouncePrimary()
300
        for conn in em.getConnectionList():
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
            if conn.isListening():
                conn.setHandler(identification.IdentificationHandler(self))
            else:
                conn.notify(packet)
                # Primary master should rather establish connections to all
                # secondaries, rather than the other way around. This requires
                # a bit more work when a new master joins a cluster but makes
                # it easier to resolve UUID conflicts with minimal cluster
                # impact, and ensure primary master unicity (primary masters
                # become noisy, in that they actively try to maintain
                # connections to all other master nodes, so duplicate
                # primaries will eventually get in touch with each other and
                # resolve the situation with a duel).
                # TODO: only abort client connections, don't close server
                # connections as we want to have them in the end. Secondary
                # masters will reconnect nevertheless, but it's dirty.
                # Currently, it's not trivial to preserve connected nodes,
                # because of poor node status tracking during election.
                conn.abort()
320

321
        # If I know any storage node, make sure that they are not in the
322
        # running state, because they are not connected at this stage.
323
        for node in self.nm.getStorageList():
324 325
            if node.isRunning():
                node.setTemporarilyDown()
326

327
        # recover the cluster status at startup
328
        self.runManager(RecoveryManager)
329
        while True:
330
            self.runManager(VerificationManager)
331 332 333 334 335 336 337
            if self.backup_tid:
                if self.backup_app is None:
                    raise RuntimeError("No upstream cluster to backup"
                                       " defined in configuration")
                self.backup_app.provideService()
            else:
                self.provideService()
338 339 340 341 342
            for node in self.nm.getIdentifiedList():
                if node.isStorage() or node.isClient():
                    node.notify(Packets.StopOperation())
                    if node.isClient():
                        node.getConnection().abort()
343 344

    def playSecondaryRole(self):
345 346 347
        """
        I play a secondary role, thus only wait for a primary master to fail.
        """
348
        logging.info('play the secondary role with %r', self.listening_conn)
349

350 351
        # Wait for an announcement. If this is too long, probably
        # the primary master is down.
352
        t = time() + 10
353 354
        while self.primary_master_node is None:
            self.em.poll(1)
355
            if t < time():
356 357 358
                # election timeout
                raise ElectionFailure("Election timeout")

359 360 361
        # Restart completely. Non-optimized
        # but lower level code needs to be stabilized first.
        for conn in self.em.getConnectionList():
362 363
            if not conn.isListening():
                conn.close()
364

365
        # Reconnect to primary master node.
366
        primary_handler = secondary.PrimaryHandler(self)
367 368
        ClientConnection(self.em, primary_handler,
            node=self.primary_master_node,
369
            connector=self.connector_handler())
370

371
        # and another for the future incoming connections
372
        self.listening_conn.setHandler(
373
            identification.SecondaryIdentificationHandler(self))
374

375
        while True:
376
            self.em.poll(1)
377

378 379 380 381 382
    def runManager(self, manager_klass):
        self._current_manager = manager_klass(self)
        self._current_manager.run()
        self._current_manager = None

383
    def changeClusterState(self, state):
384 385
        """
        Change the cluster state and apply right handler on each connections
386
        """
387 388 389 390
        if self.cluster_state == state:
            return

        # select the storage handler
391
        client_handler = client.ClientServiceHandler(self)
392 393
        if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP,
                     ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP):
394
            storage_handler = storage.StorageServiceHandler(self)
395 396
        elif self._current_manager is not None:
            storage_handler = self._current_manager.getHandler()
397
        else:
398
            raise RuntimeError('Unexpected cluster state')
399 400

        # change handlers
401
        notification_packet = Packets.NotifyClusterInformation(state)
402
        for node in self.nm.getIdentifiedList():
403
            if node.isMaster():
404
                continue
405
            conn = node.getConnection()
406
            node.notify(notification_packet)
407
            if node.isClient():
408
                if state != ClusterStates.RUNNING:
409
                    conn.abort()
410
                    continue
411
                handler = client_handler
412
            elif node.isStorage():
413
                handler = storage_handler
414 415
            else:
                continue # keep handler
416 417 418
            if type(handler) is not type(conn.getLastHandler()):
                conn.setHandler(handler)
                handler.connectionCompleted(conn)
419 420
        self.cluster_state = state

421
    def getNewUUID(self, node_type):
422 423 424
        try:
            return UUID_NAMESPACES[node_type] + os.urandom(15)
        except KeyError:
425 426 427
            raise RuntimeError, 'No UUID namespace found for this node type'

    def isValidUUID(self, uuid, addr):
428 429
        if uuid == self.uuid:
            return False
430
        node = self.nm.getByUUID(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
431
        return node is None or node.getAddress() in (None, addr)
432

433 434
    def getClusterState(self):
        return self.cluster_state
435

436 437
    def shutdown(self):
        """Close all connections and exit"""
438 439 440
        # XXX: This behaviour is probably broken, as it applies the same
        #   handler to all connection types. It must be carefuly reviewed and
        #   corrected.
441
        # change handler
442
        handler = shutdown.ShutdownHandler(self)
443 444
        for node in self.nm.getIdentifiedList():
            node.getConnection().setHandler(handler)
445 446

        # wait for all transaction to be finished
447
        while self.tm.hasPending():
448
            self.em.poll(1)
449 450

        if self.cluster_state != ClusterStates.RUNNING:
451
            logging.info("asking all nodes to shutdown")
452 453
            # This code sends packets but never polls, so they never reach
            # network.
454
            for node in self.nm.getIdentifiedList():
455
                notification = Packets.NotifyNodeInformation([node.asTuple()])
456
                if node.isClient():
457
                    node.notify(notification)
458
                elif node.isStorage() or node.isMaster():
459
                    node.notify(notification)
460 461 462

        # then shutdown
        sys.exit()
463 464

    def identifyStorageNode(self, uuid, node):
465
        if self.cluster_state == ClusterStates.STOPPING:
466
            raise NotReadyError
467 468 469 470 471
        state = NodeStates.RUNNING
        if uuid is None or node is None:
            # same as for verification
            state = NodeStates.PENDING
        return uuid, state, storage.StorageServiceHandler(self)
472 473 474

    def identifyNode(self, node_type, uuid, node):

475
        state = NodeStates.RUNNING
476

477
        if node_type == NodeTypes.ADMIN:
478
            # always accept admin nodes
479
            node_ctor = self.nm.createAdmin
480
            handler = administration.AdministrationHandler(self)
481
            logging.info('Accept an admin %s', dump(uuid))
482
        elif node_type == NodeTypes.MASTER:
483
            # always put other master in waiting state
484
            node_ctor = self.nm.createMaster
485
            handler = secondary.SecondaryMasterHandler(self)
486
            logging.info('Accept a master %s', dump(uuid))
487
        elif node_type == NodeTypes.CLIENT:
488
            # refuse any client before running
489
            if self.cluster_state != ClusterStates.RUNNING:
490
                logging.info('Reject a connection from a client')
491
                raise NotReadyError
492
            node_ctor = self.nm.createClient
493
            handler = client.ClientServiceHandler(self)
494
            logging.info('Accept a client %s', dump(uuid))
495
        elif node_type == NodeTypes.STORAGE:
496
            node_ctor = self.nm.createStorage
Julien Muchembled's avatar
Julien Muchembled committed
497 498 499 500
            manager = self._current_manager
            if manager is None:
                manager = self
            (uuid, state, handler) = manager.identifyStorageNode(uuid, node)
501
            logging.info('Accept a storage %s (%s)', dump(uuid), state)
Julien Muchembled's avatar
Julien Muchembled committed
502 503
        else:
            handler = identification.IdentificationHandler(self)
504
        return (uuid, node, state, handler, node_ctor)
505

506
    def onTransactionCommitted(self, txn):
507 508 509 510
        # I have received all the lock answers now:
        # - send a Notify Transaction Finished to the initiated client node
        # - Invalidate Objects to the other client nodes
        ttid = txn.getTTID()
511
        tid = txn.getTID()
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
        transaction_node = txn.getNode()
        invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
        transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
        for client_node in self.nm.getClientList(only_identified=True):
            c = client_node.getConnection()
            if client_node is transaction_node:
                c.answer(transaction_finished, msg_id=txn.getMessageId())
            else:
                c.notify(invalidate_objects)

        # Unlock Information to relevant storage nodes.
        notify_unlock = Packets.NotifyUnlockInformation(ttid)
        getByUUID = self.nm.getByUUID
        for storage_uuid in txn.getUUIDList():
            getByUUID(storage_uuid).getConnection().notify(notify_unlock)

528 529 530 531 532 533 534
        # Notify storage that have replications blocked by this transaction
        notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
        for storage_uuid in txn.getNotificationUUIDList():
            node = getByUUID(storage_uuid)
            if node is not None and node.isConnected():
                node.getConnection().notify(notify_finished)

535
        # remove transaction from manager
536
        self.tm.remove(transaction_node.getUUID(), ttid)
537 538
        self.setLastTransaction(tid)

539 540 541 542 543 544 545 546
    def getLastTransaction(self):
        return self.last_transaction

    def setLastTransaction(self, tid):
        ltid = self.last_transaction
        assert tid >= ltid, (tid, ltid)
        self.last_transaction = tid

547 548 549 550 551 552 553 554 555
    def setStorageNotReady(self, uuid):
        self.storage_readiness.discard(uuid)

    def setStorageReady(self, uuid):
        self.storage_readiness.add(uuid)

    def isStorageReady(self, uuid):
        return uuid in self.storage_readiness