master.py 6.55 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
from neo import logging
19

20
from neo.client.handlers import BaseHandler, AnswerBaseHandler
21
from neo.pt import MTPartitionTable as PartitionTable
22
from neo.protocol import NodeTypes, NodeStates, ProtocolError
23 24
from neo.util import dump

25
class PrimaryBootstrapHandler(AnswerBaseHandler):
26 27
    """ Bootstrap handler used when looking for the primary master """

28
    def notReady(self, conn, message):
29 30 31 32
        app = self.app
        app.trying_master_node = None
        app.setNodeNotReady()

33
    def acceptIdentification(self, conn, node_type,
34
                   uuid, num_partitions, num_replicas, your_uuid):
35 36
        app = self.app
        # this must be a master node
37
        if node_type != NodeTypes.MASTER:
38
            conn.close()
39 40
            return

41 42 43 44 45
        # the master must give an UUID
        if your_uuid is None:
            raise ProtocolError('No UUID supplied')
        app.uuid = your_uuid

46
        node = app.nm.getByAddress(conn.getAddress())
47 48 49
        conn.setUUID(uuid)
        node.setUUID(uuid)

50
        # Always create partition table
51 52
        app.pt = PartitionTable(num_partitions, num_replicas)

53
    def answerPrimary(self, conn, primary_uuid,
54 55 56
                                  known_master_list):
        app = self.app
        # Register new master nodes.
57
        for address, uuid in known_master_list:
58
            n = app.nm.getByAddress(address)
59 60
            if uuid is not None and n.getUUID() != uuid:
                n.setUUID(uuid)
61

62
        if primary_uuid is not None:
63
            primary_node = app.nm.getByUUID(primary_uuid)
64 65 66 67 68 69 70 71 72
            if primary_node is None:
                # I don't know such a node. Probably this information
                # is old. So ignore it.
                logging.warning('Unknown primary master UUID: %s. ' \
                                'Ignoring.' % dump(primary_uuid))
            else:
                app.primary_master_node = primary_node
                if app.trying_master_node is not primary_node:
                    app.trying_master_node = None
73
                    conn.close()
74 75 76 77 78
        else:
            if app.primary_master_node is not None:
                # The primary master node is not a primary master node
                # any longer.
                app.primary_master_node = None
79

80
            app.trying_master_node = None
81
            conn.close()
82

83
    def answerPartitionTable(self, conn, ptid, row_list):
84
        pass
85

86
    def answerNodeInformation(self, conn):
87 88
        pass

89
class PrimaryNotificationsHandler(BaseHandler):
90 91 92
    """ Handler that process the notifications from the primary master """

    def connectionClosed(self, conn):
93
        app = self.app
94
        logging.critical("connection to primary master node closed")
95
        conn.close()
96
        if app.master_conn is conn:
97 98
            app.master_conn = None
            app.primary_master_node = None
99
        else:
100
            logging.warn('app.master_conn is %s, but we are closing %s',
101
                    app.master_conn, conn)
102
        super(PrimaryNotificationsHandler, self).connectionClosed(conn)
103 104

    def timeoutExpired(self, conn):
105 106 107
        app = self.app
        if app.master_conn is not None:
            assert conn is app.master_conn
108 109
            logging.critical("connection timeout to primary master node ' \
                    'expired")
110 111 112
        BaseHandler.timeoutExpired(self, conn)

    def peerBroken(self, conn):
113 114 115 116
        app = self.app
        if app.master_conn is not None:
            assert conn is app.master_conn
            logging.critical("primary master node is broken")
117 118
        BaseHandler.peerBroken(self, conn)

119
    def stopOperation(self, conn):
120 121
        logging.critical("master node ask to stop operation")

122
    def invalidateObjects(self, conn, oid_list, tid):
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
        app = self.app
        app._cache_lock_acquire()
        try:
            # ZODB required a dict with oid as key, so create it
            oids = {}
            for oid in oid_list:
                oids[oid] = tid
                try:
                    del app.mq_cache[oid]
                except KeyError:
                    pass
            db = app.getDB()
            if db is not None:
                db.invalidate(tid, oids)
        finally:
            app._cache_lock_release()

140
    # For the two methods below, we must not use app._getPartitionTable()
141 142 143
    # to avoid a dead lock. It is safe to not check the master connection
    # because it's in the master handler, so the connection is already
    # established.
144
    def notifyPartitionChanges(self, conn, ptid, cell_list):
145 146 147
        pt = self.app.pt
        if pt.filled():
            pt.update(ptid, cell_list, self.app.nm)
148

149
    def sendPartitionTable(self, conn, ptid, row_list):
150
        self.app.pt.load(ptid, row_list, self.app.nm)
151

152
    def notifyNodeInformation(self, conn, node_list):
153
        app = self.app
154
        self.app.nm.update(node_list)
155
        for node_type, addr, uuid, state in node_list:
156
            if node_type != NodeTypes.STORAGE \
157
                    or state != NodeStates.RUNNING:
158
                continue
159 160 161
            # close connection to this storage if no longer running
            conn = self.app.em.getConnectionByUUID(uuid)
            if conn is not None:
162
                conn.close()
163
                if node_type == NodeTypes.STORAGE:
164
                    # Remove from pool connection
165
                    app.cp.removeConnection(conn)
166
                    self.dispatcher.unregister(conn)
167

168
class PrimaryAnswersHandler(AnswerBaseHandler):
169 170
    """ Handle that process expected packets from the primary master """

171
    def answerBeginTransaction(self, conn, tid):
172 173 174
        app = self.app
        app.setTID(tid)

175
    def answerNewOIDs(self, conn, oid_list):
176 177 178 179
        app = self.app
        app.new_oid_list = oid_list
        app.new_oid_list.reverse()

180
    def answerTransactionFinished(self, conn, tid):
181 182 183 184
        app = self.app
        if tid == app.getTID():
            app.setTransactionFinished()