app.py 6.18 KB
Newer Older
1 2
#
# Copyright (C) 2006-2009  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14 15 16 17
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19 20

from neo.config import ConfigurationManager
21
from neo.node import NodeManager, MasterNode
Aurel's avatar
Aurel committed
22
from neo.event import EventManager
23
from neo.connection import ListeningConnection
24
from neo.exception import PrimaryFailure
25 26
from neo.admin.handler import AdminEventHandler, MasterEventHandler, \
    MasterRequestEventHandler
27
from neo.connector import getConnectorHandler
28 29
from neo.bootstrap import BootstrapManager
from neo.pt import PartitionTable
30 31 32 33 34 35 36 37 38 39
from neo import protocol

class Dispatcher:
    """Dispatcher use to redirect master request to handler"""

    def __init__(self):
        # associate conn/message_id to dispatch
        # message to connection
        self.message_table = {}

Aurel's avatar
Aurel committed
40 41
    def register(self, msg_id, conn, kw=None):
        self.message_table[msg_id] = conn, kw
42

43 44
    def pop(self, msg_id):
        return self.message_table.pop(msg_id)
45 46

    def registered(self, msg_id):
Aurel's avatar
Aurel committed
47 48
        return self.message_table.has_key(msg_id)

49 50 51 52

class Application(object):
    """The storage node application."""

53
    def __init__(self, filename, section, uuid=None):
54
        config = ConfigurationManager(filename, section)
Aurel's avatar
Aurel committed
55 56 57 58 59 60 61 62 63 64

        self.name = config.getName()
        logging.debug('the name is %s', self.name)
        self.connector_handler = getConnectorHandler(config.getConnector())

        self.server = config.getServer()
        logging.debug('IP address is %s, port is %d', *(self.server))

        self.master_node_list = config.getMasterNodeList()
        logging.debug('master nodes are %s', self.master_node_list)
65

Aurel's avatar
Aurel committed
66
        # Internal attributes.
67
        self.em = EventManager()
Aurel's avatar
Aurel committed
68 69 70 71
        self.nm = NodeManager()
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
72
        self.uuid = uuid
Aurel's avatar
Aurel committed
73
        self.primary_master_node = None
74
        self.ptid = None
75
        self.request_handler = MasterRequestEventHandler(self)
76
        self.master_event_handler = MasterEventHandler(self)
77
        self.dispatcher = Dispatcher()
78
        self.cluster_state = None
79 80
        self.master_conn = None
        self.master_node = None
Aurel's avatar
Aurel committed
81

Aurel's avatar
Aurel committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
    def run(self):
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

        # Make a listening port.
        handler = AdminEventHandler(self)
        ListeningConnection(self.em, handler, addr = self.server,
                            connector_handler = self.connector_handler)

        # Connect to a primary master node, verify data, and
        # start the operation. This cycle will be executed permentnly,
        # until the user explicitly requests a shutdown.
        while 1:
            self.connectToPrimaryMaster()
            try:
                while 1:
                    self.em.poll(1)
            except PrimaryFailure:
                logging.error('primary master is down')
Aurel's avatar
Aurel committed
102

Aurel's avatar
Aurel committed
103 104 105 106 107 108

    def connectToPrimaryMaster(self):
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
109

Aurel's avatar
Aurel committed
110 111 112
        Note that I do not accept any connection from non-master nodes
        at this stage."""

113 114
        nm = self.nm
        nm.clear()
115
        self.cluster_state = None
116 117 118
        for server in self.master_node_list:
            nm.add(MasterNode(server = server))

119 120 121 122 123
        # search, find, connect and identify to the primary master
        bootstrap = BootstrapManager(self, self.name, protocol.ADMIN_NODE_TYPE, 
                self.uuid, self.server)
        data = bootstrap.getPrimaryConnection(self.connector_handler)
        (node, conn, uuid, num_partitions, num_replicas) = data
124 125
        nm.update([(node.getType(), node.getServer(), node.getUUID(),
                    protocol.RUNNING_STATE)])
126 127 128 129
        self.master_node = node
        self.master_conn = conn
        self.uuid = uuid

130
        if self.pt is None:
131
            self.pt = PartitionTable(num_partitions, num_replicas)
132
        elif self.pt.getPartitions() != num_partitions:
133
            # XXX: shouldn't we recover instead of raising ?
134
            raise RuntimeError('the number of partitions is inconsistent')
135
        elif self.pt.getReplicas() != num_replicas:
136
            # XXX: shouldn't we recover instead of raising ?
137 138 139
            raise RuntimeError('the number of replicas is inconsistent')

        # passive handler
140
        self.master_conn.setHandler(self.master_event_handler)
141 142
        self.master_conn.ask(protocol.askNodeInformation())
        self.master_conn.ask(protocol.askPartitionTable([]))
Aurel's avatar
Aurel committed
143

Aurel's avatar
Aurel committed
144
    def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id):
145 146 147 148
        # we have a pt
        self.pt.log()
        row_list = []
        if max_offset == 0:
149
            max_offset = self.pt.getPartitions()
150 151 152 153 154
        try:
            for offset in xrange(min_offset, max_offset):
                row = []
                try:
                    for cell in self.pt.getCellList(offset):
155
                        if uuid is not None and cell.getUUID() != uuid:
156 157 158 159 160 161 162
                            continue
                        else:
                            row.append((cell.getUUID(), cell.getState()))
                except TypeError:
                    pass
                row_list.append((offset, row))
        except IndexError:
163
            p = protocol.protocolError('invalid partition table offset')
164 165 166
            conn.notify(p)
            return
        p = protocol.answerPartitionList(self.ptid, row_list)
167
        conn.answer(p, msg_id)