app.py 6.31 KB
Newer Older
1 2
#
# Copyright (C) 2006-2009  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14 15 16 17 18 19 20 21
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import logging
from time import time

from neo.config import ConfigurationManager
22
from neo.protocol import MASTER_NODE_TYPE
23
from neo.node import NodeManager, MasterNode
Aurel's avatar
Aurel committed
24
from neo.event import EventManager
Aurel's avatar
Aurel committed
25
from neo.connection import ListeningConnection, ClientConnection
26
from neo.exception import PrimaryFailure
27
from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \
28
     MasterEventHandler, MasterRequestEventHandler
29
from neo.connector import getConnectorHandler, ConnectorConnectionClosedException
30 31
from neo.bootstrap import BootstrapManager
from neo.pt import PartitionTable
32 33 34 35 36 37 38 39 40 41
from neo import protocol

class Dispatcher:
    """Dispatcher use to redirect master request to handler"""

    def __init__(self):
        # associate conn/message_id to dispatch
        # message to connection
        self.message_table = {}

Aurel's avatar
Aurel committed
42 43
    def register(self, msg_id, conn, kw=None):
        self.message_table[msg_id] = conn, kw
44 45 46 47 48

    def retrieve(self, msg_id):
        return self.message_table.pop(msg_id, None)

    def registered(self, msg_id):
Aurel's avatar
Aurel committed
49 50
        return self.message_table.has_key(msg_id)

51 52 53 54

class Application(object):
    """The storage node application."""

Aurel's avatar
Aurel committed
55 56 57 58 59 60 61 62 63 64 65 66
    def __init__(self, file, section):
        config = ConfigurationManager(file, section)

        self.name = config.getName()
        logging.debug('the name is %s', self.name)
        self.connector_handler = getConnectorHandler(config.getConnector())

        self.server = config.getServer()
        logging.debug('IP address is %s, port is %d', *(self.server))

        self.master_node_list = config.getMasterNodeList()
        logging.debug('master nodes are %s', self.master_node_list)
67

Aurel's avatar
Aurel committed
68
        # Internal attributes.
69
        self.em = EventManager()
Aurel's avatar
Aurel committed
70 71 72 73
        self.nm = NodeManager()
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
74
        self.uuid = None
Aurel's avatar
Aurel committed
75
        self.primary_master_node = None
76
        self.ptid = None
77 78 79
        self.monitoring_handler = MasterMonitoringEventHandler(self)
        self.request_handler = MasterRequestEventHandler(self)
        self.dispatcher = Dispatcher()
Aurel's avatar
Aurel committed
80
        self.cluster_state = None
Aurel's avatar
Aurel committed
81

Aurel's avatar
Aurel committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    def run(self):
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

        for server in self.master_node_list:
            self.nm.add(MasterNode(server = server))

        # Make a listening port.
        handler = AdminEventHandler(self)
        ListeningConnection(self.em, handler, addr = self.server,
                            connector_handler = self.connector_handler)

        # Connect to a primary master node, verify data, and
        # start the operation. This cycle will be executed permentnly,
        # until the user explicitly requests a shutdown.
        while 1:
            self.connectToPrimaryMaster()
            try:
                while 1:
                    self.em.poll(1)
            except PrimaryFailure:
                logging.error('primary master is down')
                # do not trust any longer our informations
                self.pt.clear()
                self.nm.clear(filter = lambda node: node.getNodeType() != MASTER_NODE_TYPE)
Aurel's avatar
Aurel committed
108

Aurel's avatar
Aurel committed
109 110 111 112 113 114

    def connectToPrimaryMaster(self):
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
115

Aurel's avatar
Aurel committed
116 117 118 119
        Note that I do not accept any connection from non-master nodes
        at this stage."""

        # First of all, make sure that I have no connection.
120
        for conn in self.em.getConnectionList():
Aurel's avatar
Aurel committed
121 122 123
            if not conn.isListeningConnection():
                conn.close()

124 125 126 127 128 129 130 131 132
        # search, find, connect and identify to the primary master
        bootstrap = BootstrapManager(self, self.name, protocol.ADMIN_NODE_TYPE, 
                self.uuid, self.server)
        data = bootstrap.getPrimaryConnection(self.connector_handler)
        (node, conn, uuid, num_partitions, num_replicas) = data
        self.master_node = node
        self.master_conn = conn
        self.uuid = uuid

133
        if self.pt is None:
134
            self.pt = PartitionTable(num_partitions, num_replicas)
135
        elif self.pt.getPartitions() != num_partitions:
136
            raise RuntimeError('the number of partitions is inconsistent')
137
        elif self.pt.getReplicas() != num_replicas:
138 139 140 141
            raise RuntimeError('the number of replicas is inconsistent')

        # passive handler
        self.master_conn.setHandler(MasterEventHandler(self))
142 143
        self.master_conn.ask(protocol.askNodeInformation())
        self.master_conn.ask(protocol.askPartitionTable([]))
Aurel's avatar
Aurel committed
144

Aurel's avatar
Aurel committed
145
    def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id):
146 147 148 149
        # we have a pt
        self.pt.log()
        row_list = []
        if max_offset == 0:
150
            max_offset = self.pt.getPartitions()
151 152 153 154 155
        try:
            for offset in xrange(min_offset, max_offset):
                row = []
                try:
                    for cell in self.pt.getCellList(offset):
156
                        if uuid is not None and cell.getUUID() != uuid:
157 158 159 160 161 162 163
                            continue
                        else:
                            row.append((cell.getUUID(), cell.getState()))
                except TypeError:
                    pass
                row_list.append((offset, row))
        except IndexError:
164
            p = protocol.protocolError('invalid partition table offset')
165 166 167
            conn.notify(p)
            return
        p = protocol.answerPartitionList(self.ptid, row_list)
Aurel's avatar
Aurel committed
168
        conn.notify(p, msg_id)