storage.py 5.75 KB
Newer Older
1 2
#
# Copyright (C) 2006-2009  Nexedi SA
3

4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
from neo import logging
19 20

from neo import protocol
21
from neo.protocol import UnexpectedPacketError, ProtocolError
22
from neo.protocol import CellStates, ErrorCodes, Packets
23 24 25 26 27 28 29 30 31
from neo.master.handlers import BaseServiceHandler
from neo.exception import OperationFailure
from neo.util import dump


class StorageServiceHandler(BaseServiceHandler):
    """ Handler dedicated to storages during service state """

    def connectionCompleted(self, conn):
32
        node = self.app.nm.getByUUID(conn.getUUID())
33
        if node.isRunning():
34 35
            conn.notify(Packets.NotifyLastOID(self.app.loid))
            conn.notify(Packets.StartOperation())
36

37
    def nodeLost(self, conn, node):
38 39
        logging.info('storage node lost')
        if not self.app.pt.operational():
40
            raise OperationFailure, 'cannot continue operation'
41 42 43
        # this is intentionaly placed after the raise because the last cell in a
        # partition must not oudated to allows a cluster restart.
        self.app.outdateAndBroadcastPartition()
44

45 46
    def askLastIDs(self, conn, packet):
        app = self.app
47 48
        conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(), 
                    app.pt.getID()), packet.getId())
49 50

    def askUnfinishedTransactions(self, conn, packet):
51
        p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
52 53
        conn.answer(p, packet.getId())

54
    def notifyInformationLocked(self, conn, packet, tid):
55 56
        uuid = conn.getUUID()
        app = self.app
57
        node = app.nm.getByUUID(uuid)
58 59 60

        # If the given transaction ID is later than the last TID, the peer
        # is crazy.
61
        if tid > self.app.tm.getLastTID():
62 63 64
            raise UnexpectedPacketError

        try:
65 66 67 68 69 70
            t = self.app.tm[tid]
            if t.lock(uuid): # all nodes are locked
                # XXX: review needed:
                # don't iterate over connections but search by uuid
                # include client's uuid in Transaction object

71 72 73 74 75 76 77
                # I have received all the answers now. So send a Notify
                # Transaction Finished to the initiated client node,
                # Invalidate Objects to the other client nodes, and Unlock
                # Information to relevant storage nodes.
                for c in app.em.getConnectionList():
                    uuid = c.getUUID()
                    if uuid is not None:
78
                        node = app.nm.getByUUID(uuid)
79
                        if node.isClient():
80
                            if node is t.getNode():
81
                                p = Packets.NotifyTransactionFinished(tid)
82
                                c.answer(p, t.getMessageId())
83
                            else:
84
                                p = Packets.InvalidateObjects(t.getOIDList(),
85
                                        tid)
86
                                c.notify(p)
87
                        elif node.isStorage():
88
                            if uuid in t.getUUIDList():
89
                                p = Packets.UnlockInformation(tid)
90
                                c.notify(p)
91
                self.app.tm.remove(tid)
92 93 94 95
        except KeyError:
            # What is this?
            pass

96
    def notifyPartitionChanges(self, conn, packet, ptid, cell_list):
97 98 99 100
        # This should be sent when a cell becomes up-to-date because
        # a replication has finished.
        uuid = conn.getUUID()
        app = self.app
101
        node = app.nm.getByUUID(uuid)
102 103 104

        new_cell_list = []
        for cell in cell_list:
105
            if cell[2] != CellStates.UP_TO_DATE:
106 107 108 109 110 111 112 113
                logging.warn('only up-to-date state should be sent')
                continue

            if uuid != cell[1]:
                logging.warn('only a cell itself should send this packet')
                continue

            offset = cell[0]
114
            logging.debug("node %s is up for offset %s" %
115
                    (dump(node.getUUID()), offset))
116

117
            # check the storage said it is up to date for a partition it was
118
            # assigne to
119 120
            for xcell in app.pt.getCellList(offset):
                if xcell.getNode().getUUID() == node.getUUID() and \
121 122
                       xcell.getState() not in (CellStates.OUT_OF_DATE,
                               CellStates.UP_TO_DATE):
123
                    msg = "node %s telling that it is UP TO DATE for offset \
124
                    %s but where %s for that offset" % (dump(node.getUUID()),
125
                            offset, xcell.getState())
126
                    raise ProtocolError(msg)
127

128

129
            app.pt.setCell(offset, node, CellStates.UP_TO_DATE)
130 131 132 133
            new_cell_list.append(cell)

            # If the partition contains a feeding cell, drop it now.
            for feeding_cell in app.pt.getCellList(offset):
134
                if feeding_cell.getState() == CellStates.FEEDING:
135
                    app.pt.removeCell(offset, feeding_cell.getNode())
136
                    new_cell_list.append((offset, feeding_cell.getUUID(),
137
                                          CellStates.DISCARDED))
138 139
                    break

140
        app.broadcastPartitionChanges(new_cell_list)
141 142