app.py 11.3 KB
Newer Older
Aurel's avatar
Aurel committed
1 2
#
# Copyright (C) 2006-2009  Nexedi SA
Aurel's avatar
Aurel committed
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15 16 17
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19
import sys
20
from collections import deque
21 22

from neo.config import ConfigurationManager
23
from neo import protocol
24
from neo.protocol import TEMPORARILY_DOWN_STATE, \
25
        cell_states, HIDDEN_STATE
26
from neo.node import NodeManager, MasterNode, StorageNode
27
from neo.event import EventManager
28
from neo.storage.mysqldb import MySQLDatabaseManager
29
from neo.connection import ListeningConnection
30
from neo.exception import OperationFailure, PrimaryFailure
31 32
from neo.storage.handlers import identification, verification, initialization
from neo.storage.handlers import master, hidden
33
from neo.storage.replicator import Replicator
34
from neo.connector import getConnectorHandler
35
from neo.pt import PartitionTable
36
from neo.util import dump
37
from neo.bootstrap import BootstrapManager
38 39 40 41

class Application(object):
    """The storage node application."""

42
    def __init__(self, filename, section, reset=False, uuid=None):
43
        config = ConfigurationManager(filename, section)
44 45

        self.name = config.getName()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
46
        logging.debug('the name is %s', self.name)
47
        self.connector_handler = getConnectorHandler(config.getConnector())
48 49 50 51 52 53 54 55 56 57

        self.server = config.getServer()
        logging.debug('IP address is %s, port is %d', *(self.server))

        self.master_node_list = config.getMasterNodeList()
        logging.debug('master nodes are %s', self.master_node_list)

        # Internal attributes.
        self.em = EventManager()
        self.nm = NodeManager()
Aurel's avatar
Aurel committed
58 59
        self.dm = MySQLDatabaseManager(database = config.getDatabase(),
                                       user = config.getUser(),
60
                                       password = config.getPassword())
61
        self.loid = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
62 63 64
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
65

Aurel's avatar
Aurel committed
66
        self.replicator = None
67
        self.listening_conn = None
68
        self.master_conn = None
69 70 71 72 73 74 75 76
        self.master_node = None

        # operation related data
        self.transaction_dict = {}
        self.store_lock_dict = {}
        self.load_lock_dict = {}
        self.event_queue = None
        self.operational = False
77

78 79
        # ready is True when operational and got all informations
        self.ready = False
80 81
        self.has_node_information = False
        self.has_partition_table = False
82

83
        self.dm.setup(reset)
84 85
        self.loadConfiguration()

86 87 88 89
        # force node uuid from command line argument, for testing purpose only
        if uuid is not None:
            self.uuid = uuid

90 91 92
    def loadConfiguration(self):
        """Load persistent configuration data from the database.
        If data is not present, generate it."""
93
        dm = self.dm
94

95
        self.uuid = dm.getUUID()
96 97
        num_partitions = dm.getNumPartitions()
        num_replicas = dm.getNumReplicas()
98

99 100 101
        if num_partitions is not None and num_replicas is not None:
            if num_partitions <= 0:
                raise RuntimeError, 'partitions must be more than zero'
102
            # create a partition table
103
            self.pt = PartitionTable(num_partitions, num_replicas)
Aurel's avatar
Aurel committed
104

105 106 107 108 109
        name = dm.getName()
        if name is None:
            dm.setName(self.name)
        elif name != self.name:
            raise RuntimeError('name does not match with the database')
110 111 112 113
        ptid = dm.getPTID()
        logging.info("Configuration: uuid=%s, ptid=%s, name=%s, np=%s, nr=%s" \
                % (dump(self.uuid), dump(ptid), name, num_partitions, 
                    num_replicas))
114 115 116

    def loadPartitionTable(self):
        """Load a partition table from the database."""
117 118
        ptid = self.dm.getPTID()
        cell_list = self.dm.getPartitionTable()
119 120 121
        new_cell_list = []
        for offset, uuid, state in cell_list:
            # convert from int to Enum
122
            state = protocol.cell_states[state]
123 124 125 126 127 128 129
            # register unknown nodes
            if self.nm.getNodeByUUID(uuid) is None:
                node = StorageNode(uuid=uuid)
                node.setState(protocol.TEMPORARILY_DOWN_STATE)
                self.nm.add(node)
            new_cell_list.append((offset, uuid, state))
        # load the partition table in manager
130
        self.pt.clear()
131
        self.pt.update(ptid, new_cell_list, self.nm)
132 133 134 135 136 137 138 139 140

    def run(self):
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

        for server in self.master_node_list:
            self.nm.add(MasterNode(server = server))

141
        # Make a listening port
142
        handler = identification.IdentificationHandler(self)
143 144 145
        self.listening_conn = ListeningConnection(self.em, handler, 
            addr=self.server, connector_handler=self.connector_handler)

146 147 148 149
        # Connect to a primary master node, verify data, and
        # start the operation. This cycle will be executed permentnly,
        # until the user explicitly requests a shutdown.
        while 1:
150
            # look for the primary master
151
            self.connectToPrimaryMaster()
152
            self.operational = False
153 154 155
            try:
                while 1:
                    try:
156 157 158 159
                        # check my state
                        node = self.nm.getNodeByUUID(self.uuid)
                        if node is not None and node.getState() == HIDDEN_STATE:
                            self.wait()
160
                        self.verifyData()
161
                        self.initialize()
162 163 164
                        self.doOperation()
                    except OperationFailure:
                        logging.error('operation stopped')
165 166
                        # XXX still we can receive answer packet here
                        # this must be handle in order not to fail
Aurel's avatar
Aurel committed
167 168
                        self.operational = False

169 170
            except PrimaryFailure, msg:
                logging.error('primary master is down : %s' % msg)
171 172 173 174 175 176

    def connectToPrimaryMaster(self):
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
177

178 179
        Note that I do not accept any connection from non-master nodes
        at this stage."""
180 181 182 183
        pt = self.pt

        # First of all, make sure that I have no connection.
        for conn in self.em.getConnectionList():
184
            if not conn.isListening():
185 186 187
                conn.close()

        # search, find, connect and identify to the primary master
188 189
        bootstrap = BootstrapManager(self, self.name,
                protocol.STORAGE_NODE_TYPE, self.uuid, self.server)
190 191 192 193 194 195
        data = bootstrap.getPrimaryConnection(self.connector_handler)
        (node, conn, uuid, num_partitions, num_replicas) = data
        self.master_node = node
        self.master_conn = conn
        self.uuid = uuid
        self.dm.setUUID(uuid)
196

197 198 199
        # Reload a partition table from the database. This is necessary
        # when a previous primary master died while sending a partition
        # table, because the table might be incomplete.
200 201 202 203 204 205 206
        if pt is not None:
            self.loadPartitionTable()
            if num_partitions != pt.getPartitions():
                raise RuntimeError('the number of partitions is inconsistent')

        if pt is None or pt.getReplicas() != num_replicas:
            # changing number of replicas is not an issue
207 208
            self.dm.setNumPartitions(num_partitions)
            self.dm.setNumReplicas(num_replicas)
209
            self.pt = PartitionTable(num_partitions, num_replicas)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
210
            self.loadPartitionTable()
211 212 213 214 215 216

    def verifyData(self):
        """Verify data under the control by a primary master node.
        Connections from client nodes may not be accepted at this stage."""
        logging.info('verifying data')

217
        handler = verification.VerificationHandler(self)
218
        self.master_conn.setHandler(handler)
219 220 221 222 223
        em = self.em

        while not self.operational:
            em.poll(1)

224 225
    def initialize(self):
        """ Retreive partition table and node informations from the primary """
226
        logging.debug('initializing...')
227
        handler = initialization.InitializationHandler(self)
228 229
        self.master_conn.setHandler(handler)

230
        # ask node list and partition table
231 232
        self.has_node_information = False
        self.has_partition_table = False
233
        self.pt.clear()
234 235 236
        self.master_conn.ask(protocol.askNodeInformation())        
        self.master_conn.ask(protocol.askPartitionTable(()))
        while not self.has_node_information or not self.has_partition_table:
237
            self.em.poll(1)
238
        self.ready = True
239

240 241 242 243 244 245
    def doOperation(self):
        """Handle everything, including replications and transactions."""
        logging.info('doing operation')

        em = self.em

246
        handler = master.MasterOperationHandler(self)
247
        self.master_conn.setHandler(handler)
248

249 250 251
        # Forget all unfinished data.
        self.dm.dropUnfinishedData()

252 253 254 255 256 257 258 259
        # This is a mapping between transaction IDs and information on
        # UUIDs of client nodes which issued transactions and objects
        # which were stored.
        self.transaction_dict = {}

        # This is a mapping between object IDs and transaction IDs. Used
        # for locking objects against store operations.
        self.store_lock_dict = {}
Aurel's avatar
Aurel committed
260

261 262 263 264
        # This is a mapping between object IDs and transactions IDs. Used
        # for locking objects against load operations.
        self.load_lock_dict = {}

265 266 267
        # This is a queue of events used to delay operations due to locks.
        self.event_queue = deque()

268 269
        # The replicator.
        self.replicator = Replicator(self)
270

271 272
        while 1:
            em.poll(1)
273 274
            if self.replicator.pending():
                self.replicator.act()
275

276 277
    def wait(self):
        # change handler
278
        logging.info("waiting in hidden state")
279
        handler = hidden.HiddenHandler(self)
280 281 282 283 284 285 286
        for conn in self.em.getConnectionList():
            conn.setHandler(handler)

        node = self.nm.getNodeByUUID(self.uuid)
        while 1:
            self.em.poll(1)
            if node.getState() != HIDDEN_STATE:
287
                break
288

289 290
    def queueEvent(self, some_callable, *args, **kwargs):
        self.event_queue.append((some_callable, args, kwargs))
291 292 293 294 295

    def executeQueuedEvents(self):
        l = len(self.event_queue)
        p = self.event_queue.popleft
        for i in xrange(l):
296 297
            some_callable, args, kwargs = p()
            some_callable(*args, **kwargs)
298

299 300 301
    def shutdown(self):
        """Close all connections and exit"""
        for c in self.em.getConnectionList():
302
            if not c.isListening():
303 304
                c.close()
        sys.exit("Application has been asked to shut down")