Blame view

neo/master/handlers/client.py 3.97 KB
Grégory Wisniewski committed
1
#
Julien Muchembled committed
2
# Copyright (C) 2006-2015  Nexedi SA
Grégory Wisniewski committed
3

Grégory Wisniewski committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Grégory Wisniewski committed
8
#
Grégory Wisniewski committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
Julien Muchembled committed
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Grégory Wisniewski committed
16

Olivier Cros committed
17
from neo.lib.protocol import NodeStates, Packets, ProtocolError
Julien Muchembled committed
18
from . import MasterHandler
Grégory Wisniewski committed
19

Vincent Pelletier committed
20
class ClientServiceHandler(MasterHandler):
Grégory Wisniewski committed
21 22 23 24 25
    """ Handler dedicated to client during service state """

    def connectionCompleted(self, conn):
        pass

Vincent Pelletier committed
26
    def connectionLost(self, conn, new_state):
Julien Muchembled committed
27
        # cancel its transactions and forgot the node
Vincent Pelletier committed
28
        app = self.app
Julien Muchembled committed
29 30 31 32 33 34 35
        if app.listening_conn: # if running
            node = app.nm.getByUUID(conn.getUUID())
            assert node is not None
            app.tm.abortFor(node)
            node.setState(NodeStates.DOWN)
            app.broadcastNodesInformation([node])
            app.nm.remove(node)
Grégory Wisniewski committed
36

Grégory Wisniewski committed
37 38
    def askNodeInformation(self, conn):
        # send informations about master and storages only
Grégory Wisniewski committed
39 40 41 42 43
        nm = self.app.nm
        node_list = []
        node_list.extend(n.asTuple() for n in nm.getMasterList())
        node_list.extend(n.asTuple() for n in nm.getStorageList())
        conn.notify(Packets.NotifyNodeInformation(node_list))
Grégory Wisniewski committed
44 45
        conn.answer(Packets.AnswerNodeInformation())

Vincent Pelletier committed
46
    def askBeginTransaction(self, conn, tid):
Grégory Wisniewski committed
47 48 49
        """
            A client request a TID, nothing is kept about it until the finish.
        """
Grégory Wisniewski committed
50 51 52
        app = self.app
        node = app.nm.getByUUID(conn.getUUID())
        conn.answer(Packets.AnswerBeginTransaction(app.tm.begin(node, tid)))
Grégory Wisniewski committed
53

Grégory Wisniewski committed
54
    def askNewOIDs(self, conn, num_oids):
Julien Muchembled committed
55
        conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
Grégory Wisniewski committed
56

Julien Muchembled committed
57
    def askFinishTransaction(self, conn, ttid, oid_list, checked_list):
Grégory Wisniewski committed
58
        app = self.app
Julien Muchembled committed
59
        pt = app.pt
Grégory Wisniewski committed
60 61

        # Collect partitions related to this transaction.
Julien Muchembled committed
62 63 64 65
        getPartition = pt.getPartition
        partition_set = set(map(getPartition, oid_list))
        partition_set.update(map(getPartition, checked_list))
        partition_set.add(getPartition(ttid))
Grégory Wisniewski committed
66 67

        # Collect the UUIDs of nodes related to this transaction.
Julien Muchembled committed
68 69 70 71 72
        uuid_list = filter(app.isStorageReady, {cell.getUUID()
            for part in partition_set
            for cell in pt.getCellList(part)
            if cell.getNodeState() != NodeStates.HIDDEN})
        if not uuid_list:
Vincent Pelletier committed
73 74
            raise ProtocolError('No storage node ready for transaction')

Julien Muchembled committed
75
        identified_node_list = app.nm.getIdentifiedList(pool_set=set(uuid_list))
Vincent Pelletier committed
76

Grégory Wisniewski committed
77 78 79
        # Request locking data.
        # build a new set as we may not send the message to all nodes as some
        # might be not reachable at that time
Vincent Pelletier committed
80 81 82 83
        p = Packets.AskLockInformation(
            ttid,
            app.tm.prepare(
                ttid,
Julien Muchembled committed
84
                pt.getPartitions(),
Vincent Pelletier committed
85
                oid_list,
Julien Muchembled committed
86
                {x.getUUID() for x in identified_node_list},
Vincent Pelletier committed
87 88 89
                conn.getPeerId(),
            ),
        )
Vincent Pelletier committed
90
        for node in identified_node_list:
Vincent Pelletier committed
91
            node.ask(p, timeout=60)
Grégory Wisniewski committed
92

Vincent Pelletier committed
93 94 95
    def askPack(self, conn, tid):
        app = self.app
        if app.packing is None:
Vincent Pelletier committed
96
            storage_list = app.nm.getStorageList(only_identified=True)
Vincent Pelletier committed
97
            app.packing = (conn, conn.getPeerId(),
Julien Muchembled committed
98
                {x.getUUID() for x in storage_list})
Vincent Pelletier committed
99 100 101 102 103 104
            p = Packets.AskPack(tid)
            for storage in storage_list:
                storage.getConnection().ask(p)
        else:
            conn.answer(Packets.AnswerPack(False))

Vincent Pelletier committed
105
    def abortTransaction(self, conn, tid):
Grégory Wisniewski committed
106
        self.app.tm.remove(conn.getUUID(), tid)
Vincent Pelletier committed
107