app.py 11 KB
Newer Older
Aurel's avatar
Aurel committed
1 2
#
# Copyright (C) 2006-2009  Nexedi SA
Aurel's avatar
Aurel committed
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15 16 17
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19
import sys
20
from collections import deque
21 22

from neo.config import ConfigurationManager
23
from neo import protocol
24
from neo.protocol import TEMPORARILY_DOWN_STATE, \
25 26
        partition_cell_states, HIDDEN_STATE
from neo.node import NodeManager, MasterNode, StorageNode
27
from neo.event import EventManager
28
from neo.storage.mysqldb import MySQLDatabaseManager
29
from neo.connection import ListeningConnection
30
from neo.exception import OperationFailure, PrimaryFailure
31 32
from neo.storage.handlers import identification, verification, initialization
from neo.storage.handlers import master, hidden
33
from neo.storage.replicator import Replicator
34
from neo.connector import getConnectorHandler
35
from neo.pt import PartitionTable
36
from neo.util import dump
37
from neo.bootstrap import BootstrapManager
38 39 40 41

class Application(object):
    """The storage node application."""

42 43
    def __init__(self, filename, section, reset=False):
        config = ConfigurationManager(filename, section)
44

45
        self.uuid = None
46
        self.name = config.getName()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
47
        logging.debug('the name is %s', self.name)
48
        self.connector_handler = getConnectorHandler(config.getConnector())
49 50 51 52 53 54 55 56 57 58

        self.server = config.getServer()
        logging.debug('IP address is %s, port is %d', *(self.server))

        self.master_node_list = config.getMasterNodeList()
        logging.debug('master nodes are %s', self.master_node_list)

        # Internal attributes.
        self.em = EventManager()
        self.nm = NodeManager()
Aurel's avatar
Aurel committed
59 60
        self.dm = MySQLDatabaseManager(database = config.getDatabase(),
                                       user = config.getUser(),
61
                                       password = config.getPassword())
62
        self.loid = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63 64 65
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
66

Aurel's avatar
Aurel committed
67
        self.replicator = None
68
        self.listening_conn = None
69
        self.master_conn = None
70 71 72 73 74 75 76 77
        self.master_node = None

        # operation related data
        self.transaction_dict = {}
        self.store_lock_dict = {}
        self.load_lock_dict = {}
        self.event_queue = None
        self.operational = False
78

79 80
        # ready is True when operational and got all informations
        self.ready = False
81 82
        self.has_node_information = False
        self.has_partition_table = False
83

84
        self.dm.setup(reset)
85 86 87 88 89
        self.loadConfiguration()

    def loadConfiguration(self):
        """Load persistent configuration data from the database.
        If data is not present, generate it."""
90
        dm = self.dm
91

92
        self.uuid = dm.getUUID()
93 94
        num_partitions = dm.getNumPartitions()
        num_replicas = dm.getNumReplicas()
95

96 97 98
        if num_partitions is not None and num_replicas is not None:
            if num_partitions <= 0:
                raise RuntimeError, 'partitions must be more than zero'
99
            # create a partition table
100
            self.pt = PartitionTable(num_partitions, num_replicas)
Aurel's avatar
Aurel committed
101

102 103 104 105 106
        name = dm.getName()
        if name is None:
            dm.setName(self.name)
        elif name != self.name:
            raise RuntimeError('name does not match with the database')
107 108 109 110
        ptid = dm.getPTID()
        logging.info("Configuration: uuid=%s, ptid=%s, name=%s, np=%s, nr=%s" \
                % (dump(self.uuid), dump(ptid), name, num_partitions, 
                    num_replicas))
111 112 113

    def loadPartitionTable(self):
        """Load a partition table from the database."""
114 115 116 117 118 119 120 121
        ptid = self.dm.getPTID()
        cell_list = self.dm.getPartitionTable()
        # dirty, but have to convert states from int to Enum
        convert_states = lambda (offset, uuid, state): (offset, uuid,
                protocol.partition_cell_states[state])
        cell_list = map(convert_states, cell_list)
        self.pt.clear()
        self.pt.update(ptid, cell_list, self.nm)
122 123 124 125 126 127 128 129 130

    def run(self):
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

        for server in self.master_node_list:
            self.nm.add(MasterNode(server = server))

131
        # Make a listening port
132
        handler = identification.IdentificationHandler(self)
133 134 135
        self.listening_conn = ListeningConnection(self.em, handler, 
            addr=self.server, connector_handler=self.connector_handler)

136 137 138 139
        # Connect to a primary master node, verify data, and
        # start the operation. This cycle will be executed permentnly,
        # until the user explicitly requests a shutdown.
        while 1:
140
            # look for the primary master
141
            self.connectToPrimaryMaster()
142
            self.operational = False
143 144 145
            try:
                while 1:
                    try:
146 147 148 149
                        # check my state
                        node = self.nm.getNodeByUUID(self.uuid)
                        if node is not None and node.getState() == HIDDEN_STATE:
                            self.wait()
150
                        self.verifyData()
151
                        self.initialize()
152 153 154
                        self.doOperation()
                    except OperationFailure:
                        logging.error('operation stopped')
155 156
                        # XXX still we can receive answer packet here
                        # this must be handle in order not to fail
Aurel's avatar
Aurel committed
157 158
                        self.operational = False

159 160
            except PrimaryFailure, msg:
                logging.error('primary master is down : %s' % msg)
161 162 163 164 165 166

    def connectToPrimaryMaster(self):
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
167

168 169
        Note that I do not accept any connection from non-master nodes
        at this stage."""
170 171 172 173
        pt = self.pt

        # First of all, make sure that I have no connection.
        for conn in self.em.getConnectionList():
174
            if not conn.isListeningConnection():
175 176 177
                conn.close()

        # search, find, connect and identify to the primary master
178 179
        bootstrap = BootstrapManager(self, self.name,
                protocol.STORAGE_NODE_TYPE, self.uuid, self.server)
180 181 182 183 184 185
        data = bootstrap.getPrimaryConnection(self.connector_handler)
        (node, conn, uuid, num_partitions, num_replicas) = data
        self.master_node = node
        self.master_conn = conn
        self.uuid = uuid
        self.dm.setUUID(uuid)
186

187 188 189
        # Reload a partition table from the database. This is necessary
        # when a previous primary master died while sending a partition
        # table, because the table might be incomplete.
190 191 192 193 194 195 196
        if pt is not None:
            self.loadPartitionTable()
            if num_partitions != pt.getPartitions():
                raise RuntimeError('the number of partitions is inconsistent')

        if pt is None or pt.getReplicas() != num_replicas:
            # changing number of replicas is not an issue
197 198
            self.dm.setNumPartitions(num_partitions)
            self.dm.setNumReplicas(num_replicas)
199
            self.pt = PartitionTable(num_partitions, num_replicas)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
200
            self.loadPartitionTable()
201 202 203 204 205 206

    def verifyData(self):
        """Verify data under the control by a primary master node.
        Connections from client nodes may not be accepted at this stage."""
        logging.info('verifying data')

207
        handler = verification.VerificationHandler(self)
208
        self.master_conn.setHandler(handler)
209 210 211 212 213
        em = self.em

        while not self.operational:
            em.poll(1)

214 215
    def initialize(self):
        """ Retreive partition table and node informations from the primary """
216
        logging.debug('initializing...')
217
        handler = initialization.InitializationHandler(self)
218 219
        self.master_conn.setHandler(handler)

220
        # ask node list and partition table
221 222
        self.has_node_information = False
        self.has_partition_table = False
223
        self.pt.clear()
224 225 226
        self.master_conn.ask(protocol.askNodeInformation())        
        self.master_conn.ask(protocol.askPartitionTable(()))
        while not self.has_node_information or not self.has_partition_table:
227
            self.em.poll(1)
228
        self.ready = True
229

230 231 232 233 234 235
    def doOperation(self):
        """Handle everything, including replications and transactions."""
        logging.info('doing operation')

        em = self.em

236
        handler = master.MasterOperationHandler(self)
237
        self.master_conn.setHandler(handler)
238

239 240 241
        # Forget all unfinished data.
        self.dm.dropUnfinishedData()

242 243 244 245 246 247 248 249
        # This is a mapping between transaction IDs and information on
        # UUIDs of client nodes which issued transactions and objects
        # which were stored.
        self.transaction_dict = {}

        # This is a mapping between object IDs and transaction IDs. Used
        # for locking objects against store operations.
        self.store_lock_dict = {}
Aurel's avatar
Aurel committed
250

251 252 253 254
        # This is a mapping between object IDs and transactions IDs. Used
        # for locking objects against load operations.
        self.load_lock_dict = {}

255 256 257
        # This is a queue of events used to delay operations due to locks.
        self.event_queue = deque()

258 259
        # The replicator.
        self.replicator = Replicator(self)
260

261 262
        while 1:
            em.poll(1)
263 264
            if self.replicator.pending():
                self.replicator.act()
265

266 267
    def wait(self):
        # change handler
268
        logging.info("waiting in hidden state")
269
        handler = hidden.HiddenHandler(self)
270 271 272 273 274 275 276
        for conn in self.em.getConnectionList():
            conn.setHandler(handler)

        node = self.nm.getNodeByUUID(self.uuid)
        while 1:
            self.em.poll(1)
            if node.getState() != HIDDEN_STATE:
277
                break
278

279 280
    def queueEvent(self, some_callable, *args, **kwargs):
        self.event_queue.append((some_callable, args, kwargs))
281 282 283 284 285

    def executeQueuedEvents(self):
        l = len(self.event_queue)
        p = self.event_queue.popleft
        for i in xrange(l):
286 287
            some_callable, args, kwargs = p()
            some_callable(*args, **kwargs)
288

289 290 291 292 293 294
    def shutdown(self):
        """Close all connections and exit"""
        for c in self.em.getConnectionList():
            if not c.isListeningConnection():
                c.close()
        sys.exit("Application has been asked to shut down")