app.py 32.2 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Yoshinori Okuji's avatar
Yoshinori Okuji committed
18 19
import logging
import os
20 21
from time import time, gmtime
from struct import pack, unpack
Yoshinori Okuji's avatar
Yoshinori Okuji committed
22

Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from neo.config import ConfigurationManager
24
from neo.protocol import Packet, \
Yoshinori Okuji's avatar
Yoshinori Okuji committed
25
        RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
Aurel's avatar
Aurel committed
26
        INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, CLIENT_NODE_TYPE
Yoshinori Okuji's avatar
Yoshinori Okuji committed
27 28
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager
29
from neo.connection import ListeningConnection, ClientConnection, ServerConnection
Yoshinori Okuji's avatar
Yoshinori Okuji committed
30 31
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \
        OperationFailure
32
from neo.master.election import ElectionEventHandler
33
from neo.master.recovery import RecoveryEventHandler
34
from neo.master.verification import VerificationEventHandler
35
from neo.master.service import ServiceEventHandler
36
from neo.master.secondary import SecondaryEventHandler
37
from neo.pt import PartitionTable
38
from neo.util import dump
39
from neo import connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
40

41
class Application(object):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
42 43 44 45 46 47 48 49
    """The master node application."""

    def __init__(self, file, section):
        config = ConfigurationManager(file, section)

        self.num_replicas = config.getReplicas()
        self.num_partitions = config.getPartitions()
        self.name = config.getName()
50 51
        connector_handler = config.getConnector()
        self.connector_handler = getattr(connector, connector_handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52 53 54
        logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
                      self.num_replicas, self.num_partitions, self.name)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
55 56
        self.server = config.getServer()
        logging.debug('IP address is %s, port is %d', *(self.server))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
57 58

        # Exclude itself from the list.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
59
        self.master_node_list = [n for n in config.getMasterNodeList() if n != self.server]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
60 61 62
        logging.debug('master nodes are %s', self.master_node_list)

        # Internal attributes.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63
        self.em = EventManager()
64
        self.nm = NodeManager()
65
        self.pt = PartitionTable(self.num_partitions, self.num_replicas)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
66

67 68 69
        self.primary = None
        self.primary_master_node = None

Yoshinori Okuji's avatar
Yoshinori Okuji committed
70
        # XXX Generate an UUID for self. For now, just use a random string.
71 72 73 74 75
        # Avoid an invalid UUID.
        while 1:
            uuid = os.urandom(16)
            if uuid != INVALID_UUID:
                break
Yoshinori Okuji's avatar
Yoshinori Okuji committed
76
        self.uuid = uuid
77

Yoshinori Okuji's avatar
Yoshinori Okuji committed
78
        # The last OID.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
79
        self.loid = INVALID_OID
Yoshinori Okuji's avatar
Yoshinori Okuji committed
80
        # The last TID.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
81
        self.ltid = INVALID_TID
Yoshinori Okuji's avatar
Yoshinori Okuji committed
82 83
        # The last Partition Table ID.
        self.lptid = INVALID_PTID
84 85
        # The target node's uuid to request next.
        self.target_uuid = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
86 87 88

    def run(self):
        """Make sure that the status is sane and start a loop."""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
89 90 91 92 93 94
        if self.num_replicas <= 0:
            raise RuntimeError, 'replicas must be more than zero'
        if self.num_partitions <= 0:
            raise RuntimeError, 'partitions must be more than zero'
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'
Yoshinori Okuji's avatar
Yoshinori Okuji committed
95

Yoshinori Okuji's avatar
Yoshinori Okuji committed
96 97
        for server in self.master_node_list:
            self.nm.add(MasterNode(server = server))
98

Yoshinori Okuji's avatar
Yoshinori Okuji committed
99
        # Make a listening port.
100 101
        ListeningConnection(self.em, None, addr = self.server,
                            connector_handler = self.connector_handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
102 103 104

        # Start the election of a primary master node.
        self.electPrimary()
Aurel's avatar
Aurel committed
105

106 107 108 109
        # Start a normal operation.
        while 1:
            try:
                if self.primary:
110
                    self.playPrimaryRole()
111 112
                else:
                    self.playSecondaryRole()
113
                raise RuntimeError, 'should not reach here'
114
            except (ElectionFailure, PrimaryFailure):
115
                # Forget all connections.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
116
                for conn in self.em.getConnectionList():
117 118
                    if not isinstance(conn, ListeningConnection):
                        conn.close()
119
                # Reelect a new primary master.
120
                self.electPrimary(bootstrap = False)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
121

122
    def electPrimary(self, bootstrap = True):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
123
        """Elect a primary master node.
Aurel's avatar
Aurel committed
124

Yoshinori Okuji's avatar
Yoshinori Okuji committed
125 126 127 128 129 130
        The difficulty is that a master node must accept connections from
        others while attempting to connect to other master nodes at the
        same time. Note that storage nodes and client nodes may connect
        to self as well as master nodes."""
        logging.info('begin the election of a primary master')

131 132
        self.unconnected_master_node_set = set()
        self.negotiating_master_node_set = set()
133
        handler = ElectionEventHandler(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
134
        em = self.em
135
        nm = self.nm
136 137 138 139 140

        # Make sure that every connection has the election event handler.
        for conn in em.getConnectionList():
            conn.setHandler(handler)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
141
        while 1:
142 143 144 145 146
            t = 0
            self.primary = None
            self.primary_master_node = None

            for node in nm.getMasterNodeList():
Yoshinori Okuji's avatar
Yoshinori Okuji committed
147 148 149 150
                self.unconnected_master_node_set.add(node.getServer())
                # For now, believe that every node should be available,
                # since down or broken nodes may be already repaired.
                node.setState(RUNNING_STATE)
151 152 153 154
            self.negotiating_master_node_set.clear()

            try:
                while 1:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
155
                    em.poll(1)
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
                    current_time = time()
                    if current_time >= t + 1:
                        t = current_time
                        # Expire temporarily down nodes. For now, assume that a node
                        # which is down for 60 seconds is really down, if this is a
                        # bootstrap. 60 seconds may sound too long, but this is reasonable
                        # when rebooting many cluster machines. Otherwise, wait for only
                        # 10 seconds, because stopping the whole cluster for a long time
                        # is a bad idea.
                        if bootstrap:
                            expiration = 60
                        else:
                            expiration = 10
                        for node in nm.getMasterNodeList():
                            if node.getState() == TEMPORARILY_DOWN_STATE \
                                    and node.getLastStateChange() + expiration < current_time:
                                logging.info('%s:%d is down' % node.getServer())
                                node.setState(DOWN_STATE)
                                self.unconnected_master_node_set.discard(node.getServer())

                        # Try to connect to master nodes.
                        if self.unconnected_master_node_set:
178
                            for addr in list(self.unconnected_master_node_set):
179 180
                                ClientConnection(em, handler, addr = addr,
                                                 connector_handler = self.connector_handler)
181 182 183 184 185 186 187 188 189 190 191
                    if len(self.unconnected_master_node_set) == 0 \
                            and len(self.negotiating_master_node_set) == 0:
                        break

                # Now there are three situations:
                #   - I am the primary master
                #   - I am secondary but don't know who is primary
                #   - I am secondary and know who is primary
                if self.primary is None:
                    # I am the primary.
                    self.primary = True
192
                    logging.info('I am the primary, so sending an announcement')
193 194
                    for conn in em.getConnectionList():
                        if isinstance(conn, ClientConnection):
195 196 197 198 199 200
                            p = Packet().announcePrimaryMaster(conn.getNextId())
                            conn.addPacket(p)
                            conn.abort()
                    closed = False
                    t = time()
                    while not closed:
201
                        em.poll(1)
202
                        closed = True
203 204
                        for conn in em.getConnectionList():
                            if isinstance(conn, ClientConnection):
205
                                closed = False
206
                                break
207
                        if t + 10 < time():
208 209
                            for conn in em.getConnectionList():
                                if isinstance(conn, ClientConnection):
210 211 212 213 214 215 216
                                    conn.close()
                            closed = True
                else:
                    # Wait for an announcement. If this is too long, probably
                    # the primary master is down.
                    t = time()
                    while self.primary_master_node is None:
217
                        em.poll(1)
218 219 220 221 222
                        if t + 10 < time():
                            raise ElectionFailure, 'no primary master elected'

                    # Now I need only a connection to the primary master node.
                    primary = self.primary_master_node
223 224 225 226 227
                    addr = primary.getServer()
                    for conn in em.getConnectionList():
                        if isinstance(conn, ServerConnection) \
                                or isinstance(conn, ClientConnection) \
                                and addr != conn.getAddress():
228
                            conn.close()
229

230
                    # But if there is no such connection, something wrong happened.
231 232 233
                    for conn in em.getConnectionList():
                        if isinstance(conn, ClientConnection) \
                                and addr == conn.getAddress():
234 235 236
                            break
                    else:
                        raise ElectionFailure, 'no connection remains to the primary'
Yoshinori Okuji's avatar
Yoshinori Okuji committed
237

238
                return
239
            except ElectionFailure, m:
240
                logging.error('election failed; %s' % m)
241

242
                # Ask all connected nodes to reelect a single primary master.
243 244
                for conn in em.getConnectionList():
                    if isinstance(conn, ClientConnection):
245 246 247 248 249 250 251 252 253 254
                        conn.addPacket(Packet().reelectPrimaryMaster(conn.getNextId()))
                        conn.abort()

                # Wait until the connections are closed.
                self.primary = None
                self.primary_master_node = None
                closed = False
                t = time()
                while not closed:
                    try:
255
                        em.poll(1)
256 257 258 259
                    except ElectionFailure:
                        pass

                    closed = True
260 261
                    for conn in em.getConnectionList():
                        if isinstance(conn, ClientConnection):
262
                            # Still not closed.
Aurel's avatar
Aurel committed
263
                            closed = False
264
                            break
Aurel's avatar
Aurel committed
265

266 267 268
                    if time() > t + 10:
                        # If too long, do not wait.
                        break
269

270
                # Close all connections.
271 272 273
                for conn in em.getConnectionList():
                    if not isinstance(conn, ListeningConnection):
                        conn.close()
274 275
                bootstrap = False

276 277 278
    def broadcastNodeInformation(self, node):
        """Broadcast a Notify Node Information packet."""
        node_type = node.getNodeType()
279 280
        state = node.getState()
        uuid = node.getUUID()
281 282 283 284 285 286 287 288

        # The server address may be None.
        addr = node.getServer()
        if addr is None:
            ip_address, port = None, None
        else:
            ip_address, port = addr

289 290 291 292 293
        if ip_address is None:
            ip_address = '0.0.0.0'
        if port is None:
            port = 0

294 295
        if node_type == CLIENT_NODE_TYPE:
            # Only to master nodes and storage nodes.
296
            for c in self.em.getConnectionList():
297
                if c.getUUID() is not None:
298
                    n = self.nm.getNodeByUUID(c.getUUID())
299 300
                    if isinstance(n, (MasterNode, StorageNode)):
                        p = Packet()
301 302
                        node_list = [(node_type, ip_address, port, uuid, state)]
                        p.notifyNodeInformation(c.getNextId(), node_list)
303
                        c.addPacket(p)
304
        elif isinstance(node, (MasterNode, StorageNode)):
305
            for c in self.em.getConnectionList():
306 307
                if c.getUUID() is not None:
                    p = Packet()
308 309
                    node_list = [(node_type, ip_address, port, uuid, state)]
                    p.notifyNodeInformation(c.getNextId(), node_list)
310 311
                    c.addPacket(p)
        else:
312
            raise RuntimeError('unknown node type')
313

314 315
    def broadcastPartitionChanges(self, ptid, cell_list):
        """Broadcast a Notify Partition Changes packet."""
316
        self.pt.log()
317
        for c in self.em.getConnectionList():
318 319 320 321 322 323 324 325 326
            if c.getUUID() is not None:
                n = self.nm.getNodeByUUID(c.getUUID())
                if isinstance(n, (ClientNode, StorageNode)):
                    # Split the packet if too big.
                    size = len(cell_list)
                    start = 0
                    while size:
                        amt = min(10000, size)
                        p = Packet()
Aurel's avatar
Aurel committed
327
                        p.notifyPartitionChanges(c.getNextId(), ptid,
328
                                                 cell_list[start:start+amt])
329 330 331 332
                        c.addPacket(p)
                        size -= amt
                        start += amt

333
    def recoverStatus(self):
334 335 336
        """Recover the status about the cluster. Obtain the last OID, the last TID,
        and the last Partition Table ID from storage nodes, then get back the latest
        partition table or make a new table from scratch, if this is the first time."""
337 338 339
        logging.info('begin the recovery of the status')

        handler = RecoveryEventHandler(self)
340
        em = self.em
341 342
        nm = self.nm

343 344 345
        # Make sure that every connection has the status recovery event handler.
        for conn in em.getConnectionList():
            conn.setHandler(handler)
346

347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
        prev_lptid = None
        self.loid = INVALID_OID
        self.ltid = INVALID_TID
        self.lptid = None
        while 1:
            self.target_uuid = None
            self.pt.clear()

            if self.lptid is not None:
                # I need to retrieve last ids again.
                logging.debug('resending Ask Last IDs')
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid is not None:
                        node = nm.getNodeByUUID(uuid)
                        if isinstance(node, StorageNode) \
                                and node.getState() == RUNNING_STATE:
                            p = Packet()
                            msg_id = conn.getNextId()
                            p.askLastIDs(msg_id)
                            conn.addPacket(p)
                            conn.expectMessage(msg_id)

            # Wait for at least one storage node to appear.
            while self.target_uuid is None:
                em.poll(1)

            # Wait a bit.
            t = time()
            while time() < t + 5:
                em.poll(1)

            # Now I have at least one to ask.
            prev_lptid = self.lptid
381
            node = nm.getNodeByUUID(self.target_uuid)
382
            if node is None or node.getState() != RUNNING_STATE:
383 384 385 386 387
                # Weird. It's dead.
                logging.info('the target storage node is dead')
                continue

            for conn in em.getConnectionList():
388
                if conn.getUUID() == self.target_uuid:
389
                    break
390
            else:
391 392 393 394 395 396 397
                # Why?
                logging.info('no connection to the target storage node')
                continue

            if self.lptid == INVALID_PTID:
                # This looks like the first time. So make a fresh table.
                logging.debug('creating a new partition table')
398
                self.getNextPartitionTableID()
399 400
                self.pt.make(nm.getStorageNodeList())
            else:
401 402 403 404
                # Obtain a partition table. It is necessary to split this
                # message, because the packet size can be huge.
                logging.debug('asking a partition table to %s:%d', 
                              *(node.getServer()))
405 406 407
                start = 0
                size = self.num_partitions
                while size:
408
                    amt = min(1000, size)
409 410
                    msg_id = conn.getNextId()
                    p = Packet()
411
                    p.askPartitionTable(msg_id, range(start, start + amt))
412 413
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
414 415
                    size -= amt
                    start += amt
416 417 418 419 420 421 422 423 424 425 426 427

                t = time()
                while 1:
                    em.poll(1)
                    if node.getState() != RUNNING_STATE:
                        # Dead.
                        break
                    if self.pt.filled() or t + 30 < time():
                        break

                if self.lptid != prev_lptid or not self.pt.filled():
                    # I got something newer or the target is dead.
428 429 430
                    logging.debug('self.lptid = %s, prev_lptid = %s',
                                  dump(self.lptid), dump(prev_lptid))
                    self.pt.log()
431 432
                    continue

433 434
                # Wait until the cluster gets operational or the Partition
                # Table ID turns out to be not the latest.
435
                logging.debug('waiting for the cluster to be operational')
436
                self.pt.log()
437 438 439 440 441 442 443 444 445 446 447 448
                while 1:
                    em.poll(1)
                    if self.pt.operational():
                        break
                    if self.lptid != prev_lptid:
                        break

                if self.lptid != prev_lptid:
                    # I got something newer.
                    continue
            break

449 450 451 452 453
    def verifyTransaction(self, tid):
        em = self.em
        uuid_set = set()

        # Determine to which nodes I should ask.
454
        partition = self.getPartition(tid)
455 456 457 458
        transaction_uuid_list = [cell.getUUID() for cell \
                in self.pt.getCellList(partition, True)]
        if len(transaction_uuid_list) == 0:
            raise VerificationFailure
459
        uuid_set.update(transaction_uuid_list)
Aurel's avatar
Aurel committed
460

461 462 463 464 465 466 467 468 469
        # Gather OIDs.
        self.asking_uuid_dict = {}
        self.unfinished_oid_set = set()
        for conn in em.getConnectionList():
            uuid = conn.getUUID()
            if uuid in transaction_uuid_list:
                self.asking_uuid_dict[uuid] = False
                p = Packet()
                msg_id = conn.getNextId()
470
                p.askTransactionInformation(msg_id, tid)
471 472
                conn.addPacket(p)
                conn.expectMessage(msg_id)
473
        if len(self.asking_uuid_dict) == 0:
474 475 476 477 478 479 480 481 482
            raise VerificationFailure

        while 1:
            em.poll(1)
            if not self.pt.operational():
                raise VerificationFailure
            if False not in self.asking_uuid_dict.values():
                break

483
        if self.unfinished_oid_set is None or len(self.unfinished_oid_set) == 0:
484 485 486 487 488 489
            # Not commitable.
            return None
        else:
            # Verify that all objects are present.
            for oid in self.unfinished_oid_set:
                self.asking_uuid_dict.clear()
490
                partition = self.getPartition(oid)
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
                object_uuid_list = [cell.getUUID() for cell \
                            in self.pt.getCellList(partition, True)]
                if len(object_uuid_list) == 0:
                    raise VerificationFailure
                uuid_set.update(object_uuid_list)

                self.object_present = True
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid in object_uuid_list:
                        self.asking_uuid_dict[uuid] = False
                        p = Packet()
                        msg_id = conn.getNextId()
                        p.askObjectPresent(msg_id, oid, tid)
                        conn.addPacket(p)
                        conn.expectMessage(msg_id)

                while 1:
                    em.poll(1)
                    if not self.pt.operational():
                        raise VerificationFailure
                    if False not in self.asking_uuid_dict.values():
                        break

                if not self.object_present:
                    # Not commitable.
                    return None

        return uuid_set

521 522 523 524
    def verifyData(self):
        """Verify the data in storage nodes and clean them up, if necessary."""
        logging.info('start to verify data')

525
        handler = VerificationEventHandler(self)
526 527 528
        em = self.em
        nm = self.nm

529 530 531 532
        # Make sure that every connection has the data verification event handler.
        for conn in em.getConnectionList():
            conn.setHandler(handler)

533 534 535 536 537
        # FIXME this part has a potential problem that the write buffers can
        # be very huge. Thus it would be better to flush the buffers from time
        # to time _without_ reading packets.

        # Send the current partition table to storage nodes, so that
538
        # all nodes share the same view.
539 540 541 542
        for conn in em.getConnectionList():
            uuid = conn.getUUID()
            if uuid is not None:
                node = nm.getNodeByUUID(uuid)
543
                if isinstance(node, StorageNode):
544 545 546 547 548 549
                    # Split the packet if too huge.
                    p = Packet()
                    row_list = []
                    for offset in xrange(self.num_partitions):
                        row_list.append((offset, self.pt.getRow(offset)))
                        if len(row_list) == 1000:
Aurel's avatar
Aurel committed
550
                            p.sendPartitionTable(conn.getNextId(),
551
                                                 self.lptid, row_list)
552 553 554
                            conn.addPacket(p)
                            del row_list[:]
                    if len(row_list) != 0:
Aurel's avatar
Aurel committed
555
                        p.sendPartitionTable(conn.getNextId(),
556
                                             self.lptid, row_list)
557
                        conn.addPacket(p)
Aurel's avatar
Aurel committed
558

559
        # Gather all unfinished transactions.
Aurel's avatar
Aurel committed
560
        #
561 562 563 564 565 566 567 568 569
        # FIXME this part requires more brainstorming. Currently, this deals with
        # only unfinished transactions. But how about finished transactions?
        # Suppose that A and B have an unfinished transaction. First, A and B are
        # asked to commit the transaction. Then, A succeeds. B gets down. Now,
        # A believes that the transaction has been committed, while B still believes
        # that the transaction is unfinished. Next, if B goes back and A is working,
        # no problem; because B's unfinished transaction will be committed correctly.
        # However, when B goes back, if A is down, what happens? If the state is
        # not very good, B may be asked to abort the transaction!
Aurel's avatar
Aurel committed
570
        #
571 572 573 574
        # This situation won't happen frequently, and B shouldn't be asked to drop
        # the transaction, if the cluster is not ready. However, there might be
        # some corner cases where this may happen. That's why more brainstorming
        # is required.
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
        self.asking_uuid_dict = {}
        self.unfinished_tid_set = set()
        for conn in em.getConnectionList():
            uuid = conn.getUUID()
            if uuid is not None:
                node = nm.getNodeByUUID(uuid)
                if isinstance(node, StorageNode):
                    self.asking_uuid_dict[uuid] = False
                    p = Packet()
                    msg_id = conn.getNextId()
                    p.askUnfinishedTransactions(msg_id)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)

        while 1:
            em.poll(1)
            if not self.pt.operational():
                raise VerificationFailure
            if False not in self.asking_uuid_dict.values():
                break

        # Gather OIDs for each unfinished TID, and verify whether the transaction
        # can be finished or must be aborted. This could be in parallel in theory,
        # but not so easy. Thus do it one-by-one at the moment.
        for tid in self.unfinished_tid_set:
            uuid_set = self.verifyTransaction(tid)
            if uuid_set is None:
                # Make sure that no node has this transaction.
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid is not None:
                        node = nm.getNodeByUUID(uuid)
                        if isinstance(node, StorageNode):
                            p = Packet()
                            p.deleteTransaction(conn.getNextId(), tid)
                            conn.addPacket(p)
            else:
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid in uuid_set:
                        p = Packet()
                        p.commitTransaction(conn.getNextId(), tid)
                        conn.addPacket(p)
618

619 620
            # If possible, send the packets now.
            em.poll(0)
621

622 623 624
        # At this stage, all non-working nodes are out-of-date.
        cell_list = self.pt.outdate()

625 626
        # Tweak the partition table, if the distribution of storage nodes
        # is not uniform.
627
        cell_list.extend(self.pt.tweak())
628

629 630 631 632 633
        # And, add unused nodes.
        node_list = self.pt.getNodeList()
        for node in nm.getStorageNodeList():
            if node.getState() == RUNNING_STATE and node not in node_list:
                cell_list.extend(self.pt.addNode(node))
634

635 636
        # If anything changed, send the changes.
        if cell_list:
Aurel's avatar
Aurel committed
637
            self.broadcastPartitionChanges(self.getNextPartitionTableID(),
638
                                           cell_list)
639

Yoshinori Okuji's avatar
Yoshinori Okuji committed
640 641 642 643 644 645
    def provideService(self):
        """This is the normal mode for a primary master node. Handle transactions
        and stop the service only if a catastrophy happens or the user commits
        a shutdown."""
        logging.info('provide service')

646
        handler = ServiceEventHandler(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
647 648 649
        em = self.em
        nm = self.nm

650 651 652
        # This dictionary is used to hold information on transactions being finished.
        self.finishing_transaction_dict = {}

Yoshinori Okuji's avatar
Yoshinori Okuji committed
653 654 655 656 657 658 659 660 661 662 663 664 665
        # Make sure that every connection has the service event handler.
        for conn in em.getConnectionList():
            conn.setHandler(handler)

        # Now storage nodes should know that the cluster is operational.
        for conn in em.getConnectionList():
            uuid = conn.getUUID()
            if uuid is not None:
                node = nm.getNodeByUUID(uuid)
                if isinstance(node, StorageNode):
                    conn.addPacket(Packet().startOperation(conn.getNextId()))

        # Now everything is passive.
666
        expiration = 10
Yoshinori Okuji's avatar
Yoshinori Okuji committed
667
        while 1:
668
            t = 0
Yoshinori Okuji's avatar
Yoshinori Okuji committed
669 670
            try:
                em.poll(1)
671
                # implement an expiration of temporary down nodes.
672 673 674
                # If a temporary down storage node is expired, it moves to
                # down state, and the partition table must drop the node,
                # thus repartitioning must be performed.
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
                current_time = time()
                if current_time >= t + 1:
                    t = current_time
                    for node in nm.getStorageNodeList():
                        if node.getState() == TEMPORARILY_DOWN_STATE \
                               and node.getLastStateChange() + expiration < current_time:
                            logging.info('%s:%d is down' % node.getServer())
                            node.setState(DOWN_STATE)
                            self.broadcastNodeInformation(node)
                            cell_list = self.pt.dropNode(node)
                            ptid = self.getNextPartitionTableID()
                            self.broadcastPartitionChanges(ptid, cell_list)
                            if not self.pt.operational():
                                # Catastrophic.
                                raise OperationFailure, 'cannot continue operation'

                        
Yoshinori Okuji's avatar
Yoshinori Okuji committed
692 693 694 695 696 697 698 699 700 701 702 703 704 705
            except OperationFailure:
                # If not operational, send Stop Operation packets to storage nodes
                # and client nodes. Abort connections to client nodes.
                logging.critical('No longer operational, so stopping the service')
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid is not None:
                        node = nm.getNodeByUUID(uuid)
                        if isinstance(node, (StorageNode, ClientNode)):
                            conn.addPacket(Packet().stopOperation(conn.getNextId()))
                            if isinstance(node, ClientNode):
                                conn.abort()

                # Then, go back, and restart.
Aurel's avatar
Aurel committed
706
                return
707

708 709
    def playPrimaryRole(self):
        logging.info('play the primary role')
710 711 712

        # If I know any storage node, make sure that they are not in the running state,
        # because they are not connected at this stage.
713
        for node in self.nm.getStorageNodeList():
714 715 716
            if node.getState() == RUNNING_STATE:
                node.setState(TEMPORARILY_DOWN_STATE)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
717 718 719 720 721 722 723 724 725 726 727
        while 1:
            recovering = True
            while recovering:
                self.recoverStatus()
                recovering = False
                try:
                    self.verifyData()
                except VerificationFailure:
                    recovering = True

            self.provideService()
728 729

    def playSecondaryRole(self):
730
        """I play a secondary role, thus only wait for a primary master to fail."""
731
        logging.info('play the secondary role')
732

Aurel's avatar
Aurel committed
733
        handler = SecondaryEventHandler(self)
734 735 736 737 738 739 740 741 742
        em = self.em
        nm = self.nm

        # Make sure that every connection has the secondary event handler.
        for conn in em.getConnectionList():
            conn.setHandler(handler)

        while 1:
            em.poll(1)
743 744 745 746 747

    def getNextPartitionTableID(self):
        if self.lptid is None:
            raise RuntimeError, 'I do not know the last Partition Table ID'

748 749
        ptid = unpack('!Q', self.lptid)[0]
        self.lptid = pack('!Q', ptid + 1)
750
        return self.lptid
751

752 753 754 755 756 757 758 759
    def getNextOID(self):
        if self.loid is None:
            raise RuntimeError, 'I do not know the last OID'

        oid = unpack('!Q', self.loid)[0]
        self.loid = pack('!Q', oid + 1)
        return self.loid

760 761 762
    def getNextTID(self):
        tm = time()
        gmt = gmtime(tm)
763 764
        upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \
                  + gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min
765 766 767 768 769 770 771
        lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
        tid = pack('!LL', upper, lower)
        if tid <= self.ltid:
            upper, lower = unpack('!LL', self.ltid)
            if lower == 0xffffffff:
                # This should not happen usually.
                from datetime import timedelta, datetime
772 773 774 775 776
                d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday, 
                             gmt.tm_hour, gmt.tm_min) \
                        + timedelta(0, 60)
                upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \
                          + d.day - 1) * 24 + d.hour) * 60 + d.minute
777 778 779 780 781 782 783 784 785
                lower = 0
            else:
                lower += 1
            tid = pack('!LL', upper, lower)
        self.ltid = tid
        return tid

    def getPartition(self, oid_or_tid):
        return unpack('!Q', oid_or_tid)[0] % self.num_partitions
Aurel's avatar
Aurel committed
786

Aurel's avatar
Aurel committed
787 788
    def getNewOIDList(self, num_oids):
        return [self.getNextOID() for i in xrange(num_oids)]