identification.py 6.54 KB
Newer Older
1
#
2
# Copyright (C) 2006-2019  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
from neo.lib import logging
18 19
from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler
20 21
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
    NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str
22
from ..app import monotonic_time
23

24
class IdentificationHandler(EventHandler):
25

26
    def requestIdentification(self, conn, node_type, uuid,
27
                              address, name, id_timestamp, extra):
Julien Muchembled's avatar
Julien Muchembled committed
28
        app = self.app
29 30 31 32
        self.checkClusterName(name)
        if address == app.server:
            raise ProtocolError('address conflict')
        node = app.nm.getByUUID(uuid)
33 34 35
        by_addr = address and app.nm.getByAddress(address)
        while 1:
            if by_addr:
36
                if not by_addr.isIdentified():
37 38 39 40 41 42 43 44
                    if node is by_addr:
                        break
                    if not node or uuid < 0:
                        # In case of address conflict for a peer with temporary
                        # ids, we'll generate a new id.
                        node = by_addr
                        break
            elif node:
45
                if node.isIdentified():
46 47 48 49 50 51
                    if uuid < 0:
                        # The peer wants a temporary id that's already assigned.
                        # Let's give it another one.
                        node = uuid = None
                        break
                else:
52 53 54 55
                    if node is app._node:
                        node = None
                    else:
                        node.setAddress(address)
56 57
                    break
                # Id conflict for a storage node.
58
            else:
59 60 61
                break
            # cloned/evil/buggy node connecting to us
            raise ProtocolError('already connected')
62

63
        new_nid = extra.pop('new_nid', None)
64 65
        state = NodeStates.RUNNING
        if node_type == NodeTypes.CLIENT:
66 67 68 69 70
            if app.cluster_state == ClusterStates.RUNNING:
                handler = app.client_service_handler
            elif app.cluster_state == ClusterStates.BACKINGUP:
                handler = app.client_ro_service_handler
            else:
71 72 73
                raise NotReadyError
            human_readable_node_type = ' client '
        elif node_type == NodeTypes.STORAGE:
74 75
            if app.cluster_state == ClusterStates.STOPPING_BACKUP:
                raise NotReadyError
76 77 78
            manager = app._current_manager
            if manager is None:
                manager = app
79 80
            state, handler = manager.identifyStorageNode(
                uuid is not None and node is not None)
81 82 83 84 85 86 87 88 89 90
            if not address:
                if app.cluster_state == ClusterStates.RECOVERING:
                    raise NotReadyError
                if uuid or not new_nid:
                    raise ProtocolError
                state = NodeStates.DOWN
                # We'll let the storage node close the connection. If we
                # aborted it at the end of the method, BootstrapManager
                # (which is used by storage nodes) could see the closure
                # and try to reconnect to a master.
91 92
            human_readable_node_type = ' storage (%s) ' % (state, )
        elif node_type == NodeTypes.MASTER:
93 94 95 96 97 98 99 100
            if app.election:
                if id_timestamp and \
                  (id_timestamp, address) < (app.election, app.server):
                    raise PrimaryElected(by_addr or
                        app.nm.createMaster(address=address))
                handler = app.election_handler
            else:
                handler = app.secondary_handler
101 102 103 104 105
            human_readable_node_type = ' master '
        elif node_type == NodeTypes.ADMIN:
            handler = app.administration_handler
            human_readable_node_type = 'n admin '
        else:
106
            raise ProtocolError
107

108
        uuid = app.getNewUUID(uuid, address, node_type)
109
        logging.info('Accept a' + human_readable_node_type + uuid_str(uuid))
110
        if node is None:
111 112
            node = app.nm.createFromNodeType(node_type,
                uuid=uuid, address=address)
113 114
        else:
            node.setUUID(uuid)
115
        node.extra = extra
116
        node.id_timestamp = monotonic_time()
117
        node.setState(state)
118 119 120 121 122 123 124
        app.broadcastNodesInformation([node])
        if new_nid:
            changed_list = []
            for offset in new_nid:
                changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
                app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
            app.broadcastPartitionChanges(changed_list)
125
        conn.setHandler(handler)
126
        node.setConnection(conn, not node.isIdentified())
127

128 129 130 131 132
        conn.answer(Packets.AcceptIdentification(
            NodeTypes.MASTER,
            app.uuid,
            uuid))
        handler._notifyNodeInformation(conn)
133
        handler.handlerSwitched(conn, True)
134

135
class SecondaryIdentificationHandler(EventHandler):
136

137
    def requestIdentification(self, conn, node_type, uuid,
138
                              address, name, id_timestamp, extra):
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
        app = self.app
        self.checkClusterName(name)
        if address == app.server:
            raise ProtocolError('address conflict')
        primary = app.primary_master.getAddress()
        if primary == address:
            primary = None
        elif not app.primary_master.isIdentified():
            if node_type == NodeTypes.MASTER:
                node = app.nm.createMaster(address=address)
                if id_timestamp:
                    conn.close()
                    raise PrimaryElected(node)
            primary = None
        # For some cases, we rely on the fact that the remote will not retry
        # immediately (see SocketConnector.CONNECT_LIMIT).
        known_master_list = [node.getAddress()
            for node in app.nm.getMasterList()]
        conn.send(Packets.NotPrimaryMaster(
            primary and known_master_list.index(primary),
            known_master_list))
        conn.abort()