app.py 6.36 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
import neo.lib
19

20 21 22 23
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
24
from .handler import AdminEventHandler, MasterEventHandler, \
25
    MasterRequestEventHandler
26 27 28 29
from neo.lib.connector import getConnectorHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import NodeTypes, NodeStates, Packets, Errors
30
from neo.lib.debug import register as registerLiveDebugger
31 32 33 34 35 36 37 38 39

class Dispatcher:
    """Dispatcher use to redirect master request to handler"""

    def __init__(self):
        # associate conn/message_id to dispatch
        # message to connection
        self.message_table = {}

Aurel's avatar
Aurel committed
40 41
    def register(self, msg_id, conn, kw=None):
        self.message_table[msg_id] = conn, kw
42

43 44
    def pop(self, msg_id):
        return self.message_table.pop(msg_id)
45 46

    def registered(self, msg_id):
Aurel's avatar
Aurel committed
47 48
        return self.message_table.has_key(msg_id)

49 50 51 52
    def clear(self):
        """
            Unregister packet expected for a given connection
        """
Vincent Pelletier's avatar
Vincent Pelletier committed
53
        self.message_table.clear()
54 55 56 57

class Application(object):
    """The storage node application."""

58
    def __init__(self, config):
59 60
        # Internal attributes.
        self.em = EventManager()
61
        self.nm = NodeManager(config.getDynamicMasterList())
62

63 64
        self.name = config.getCluster()
        self.server = config.getBind()
Aurel's avatar
Aurel committed
65

Olivier Cros's avatar
Olivier Cros committed
66 67
        self.master_addresses, connector_name = config.getMasters()
        self.connector_handler = getConnectorHandler(connector_name)
68
        neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
69

Aurel's avatar
Aurel committed
70 71 72
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
73
        self.uuid = config.getUUID()
Aurel's avatar
Aurel committed
74
        self.primary_master_node = None
75
        self.request_handler = MasterRequestEventHandler(self)
76
        self.master_event_handler = MasterEventHandler(self)
77
        self.dispatcher = Dispatcher()
78
        self.cluster_state = None
79 80 81
        self.reset()
        registerLiveDebugger(on_log=self.log)

82 83 84 85 86 87
    def close(self):
        self.listening_conn = None
        self.nm.close()
        self.em.close()
        del self.__dict__

88
    def reset(self):
89
        self.bootstrapped = False
90 91
        self.master_conn = None
        self.master_node = None
92 93 94 95 96 97

    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()
Aurel's avatar
Aurel committed
98

Aurel's avatar
Aurel committed
99 100 101 102 103 104 105
    def run(self):
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

        # Make a listening port.
        handler = AdminEventHandler(self)
106 107
        self.listening_conn = ListeningConnection(self.em, handler,
            addr=self.server, connector=self.connector_handler())
Aurel's avatar
Aurel committed
108

109
        while True:
110
            self.connectToPrimary()
Aurel's avatar
Aurel committed
111
            try:
112
                while True:
Aurel's avatar
Aurel committed
113 114
                    self.em.poll(1)
            except PrimaryFailure:
115
                neo.lib.logging.error('primary master is down')
Aurel's avatar
Aurel committed
116

Aurel's avatar
Aurel committed
117

118
    def connectToPrimary(self):
Aurel's avatar
Aurel committed
119 120 121 122
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
123

Aurel's avatar
Aurel committed
124 125 126
        Note that I do not accept any connection from non-master nodes
        at this stage."""

127
        nm = self.nm
128
        nm.init()
129
        self.cluster_state = None
130 131 132

        for address in self.master_addresses:
            self.nm.createMaster(address=address)
133

134
        # search, find, connect and identify to the primary master
135
        bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN,
136 137 138
                self.uuid, self.server)
        data = bootstrap.getPrimaryConnection(self.connector_handler)
        (node, conn, uuid, num_partitions, num_replicas) = data
139
        nm.update([(node.getType(), node.getAddress(), node.getUUID(),
140
                    NodeStates.RUNNING)])
141 142 143 144
        self.master_node = node
        self.master_conn = conn
        self.uuid = uuid

145
        if self.pt is None:
146
            self.pt = PartitionTable(num_partitions, num_replicas)
147
        elif self.pt.getPartitions() != num_partitions:
148
            # XXX: shouldn't we recover instead of raising ?
149
            raise RuntimeError('the number of partitions is inconsistent')
150
        elif self.pt.getReplicas() != num_replicas:
151
            # XXX: shouldn't we recover instead of raising ?
152 153 154
            raise RuntimeError('the number of replicas is inconsistent')

        # passive handler
155
        self.master_conn.setHandler(self.master_event_handler)
156
        self.master_conn.ask(Packets.AskNodeInformation())
157
        self.master_conn.ask(Packets.AskPartitionTable())
Aurel's avatar
Aurel committed
158

159
    def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
160 161 162 163
        # we have a pt
        self.pt.log()
        row_list = []
        if max_offset == 0:
164
            max_offset = self.pt.getPartitions()
165 166 167 168 169
        try:
            for offset in xrange(min_offset, max_offset):
                row = []
                try:
                    for cell in self.pt.getCellList(offset):
170
                        if uuid is not None and cell.getUUID() != uuid:
171 172 173 174 175 176 177
                            continue
                        else:
                            row.append((cell.getUUID(), cell.getState()))
                except TypeError:
                    pass
                row_list.append((offset, row))
        except IndexError:
178
            p = Errors.ProtocolError('invalid partition table offset')
179 180
            conn.notify(p)
            return
181
        p = Packets.AnswerPartitionList(self.pt.getID(), row_list)
182
        conn.answer(p)