node.py 14.5 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18 19
from time import time

20
from neo import logging
Yoshinori Okuji's avatar
Yoshinori Okuji committed
21
from neo.util import dump
22
from neo.protocol import NodeTypes, NodeStates
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23

24 25
from neo import attributeTracker

Yoshinori Okuji's avatar
Yoshinori Okuji committed
26 27 28
class Node(object):
    """This class represents a node."""

29
    def __init__(self, manager, address=None, uuid=None,
30
            state=NodeStates.UNKNOWN):
31 32 33 34 35
        self._state = state
        self._address = address
        self._uuid = uuid
        self._manager = manager
        self._last_state_change = time()
36
        self._connection = None
37

38 39 40 41
    def notify(self, packet):
        assert self.isConnected(), 'Not connected'
        self._connection.notify(packet)

42
    def ask(self, packet, *args, **kw):
43
        assert self.isConnected(), 'Not connected'
44
        self._connection.ask(packet, *args, **kw)
45 46 47 48 49

    def answer(self, packet, msg_id=None):
        assert self.isConnected(), 'Not connected'
        self._connection.answer(packet, msg_id)

50
    def getLastStateChange(self):
51
        return self._last_state_change
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52 53

    def getState(self):
54
        return self._state
Yoshinori Okuji's avatar
Yoshinori Okuji committed
55

56
    def setState(self, new_state):
57 58 59 60 61 62
        if self._state == new_state:
            return
        old_state = self._state
        self._state = new_state
        self._last_state_change = time()
        self._manager._updateState(self, old_state)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63

64 65 66 67
    def setAddress(self, address):
        old_address = self._address
        self._address = address
        self._manager._updateAddress(self, old_address)
68

69 70
    def getAddress(self):
        return self._address
71 72

    def setUUID(self, uuid):
73 74 75
        old_uuid = self._uuid
        self._uuid = uuid
        self._manager._updateUUID(self, old_uuid)
76
        self._manager._updateIdentified(self)
77 78

    def getUUID(self):
79
        return self._uuid
80

81
    def onConnectionClosed(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
82 83 84
        """
            Callback from node's connection when closed
        """
85 86
        assert self._connection is not None
        self._connection = None
87
        self._manager._updateIdentified(self)
88

89
    def setConnection(self, connection):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
90 91 92
        """
            Define the connection that is currently available to this node.
        """
93 94
        assert connection is not None
        assert self._connection is None
95
        self._connection = connection
96
        connection.setOnClose(self.onConnectionClosed)
97
        self._manager._updateIdentified(self)
98 99

    def getConnection(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
100 101 102
        """
            Returns the connection to the node if available
        """
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
        assert self._connection is not None
        return self._connection

    def isConnected(self):
        """
            Returns True is a connection is established with the node
        """
        return self._connection is not None

    def isIdentified(self):
        """
            Returns True is the node is connected and identified
        """
        return self._connection is not None and self._uuid is not None

118
    def __repr__(self):
119
        return '<%s(uuid=%s, address=%s, state=%s) at %x>' % (
120
            self.__class__.__name__,
121
            dump(self._uuid),
122
            self._address,
123
            self._state,
124
            id(self),
125
        )
126

127
    def isMaster(self):
128
        return False
129 130

    def isStorage(self):
131
        return False
132 133

    def isClient(self):
134
        return False
135 136

    def isAdmin(self):
137
        return False
138

139
    def isRunning(self):
140
        return self._state == NodeStates.RUNNING
141

142 143 144
    def isUnknown(self):
        return self._state == NodeStates.UNKNOWN

145
    def isTemporarilyDown(self):
146
        return self._state == NodeStates.TEMPORARILY_DOWN
147 148

    def isDown(self):
149
        return self._state == NodeStates.DOWN
150

151
    def isBroken(self):
152
        return self._state == NodeStates.BROKEN
153 154

    def isHidden(self):
155
        return self._state == NodeStates.HIDDEN
156 157

    def isPending(self):
158
        return self._state == NodeStates.PENDING
159 160

    def setRunning(self):
161
        self.setState(NodeStates.RUNNING)
162

163 164 165
    def setUnknown(self):
        self.setState(NodeStates.UNKNOWN)

166
    def setTemporarilyDown(self):
167
        self.setState(NodeStates.TEMPORARILY_DOWN)
168 169

    def setDown(self):
170
        self.setState(NodeStates.DOWN)
171 172

    def setBroken(self):
173
        self.setState(NodeStates.BROKEN)
174 175

    def setHidden(self):
176
        self.setState(NodeStates.HIDDEN)
177 178

    def setPending(self):
179
        self.setState(NodeStates.PENDING)
180

181 182 183 184
    def asTuple(self):
        """ Returned tuple is intented to be used in procotol encoders """
        return (self.getType(), self._address, self._uuid, self._state)

185 186 187 188 189 190
    def __gt__(self, node):
        # sort per UUID if defined
        if self._uuid is not None:
            return self._uuid > node._uuid
        return self._address > node._address

191 192 193 194 195 196
    def getType(self):
        try:
            return NODE_CLASS_MAPPING[self.__class__]
        except KeyError:
            raise NotImplementedError

197 198 199 200 201 202 203 204
    def whoSetState(self):
        """
          Debugging method: call this method to know who set the current
          state value.
        """
        return attributeTracker.whoSet(self, '_state')

attributeTracker.track(Node)
205

Yoshinori Okuji's avatar
Yoshinori Okuji committed
206 207
class MasterNode(Node):
    """This class represents a master node."""
208 209 210

    def isMaster(self):
        return True
Yoshinori Okuji's avatar
Yoshinori Okuji committed
211 212 213

class StorageNode(Node):
    """This class represents a storage node."""
214

215 216
    def isStorage(self):
        return True
Yoshinori Okuji's avatar
Yoshinori Okuji committed
217 218 219

class ClientNode(Node):
    """This class represents a client node."""
220

221 222
    def isClient(self):
        return True
Yoshinori Okuji's avatar
Yoshinori Okuji committed
223

Aurel's avatar
Aurel committed
224 225
class AdminNode(Node):
    """This class represents an admin node."""
226

227 228
    def isAdmin(self):
        return True
Aurel's avatar
Aurel committed
229

230

231
NODE_TYPE_MAPPING = {
232 233 234 235
    NodeTypes.MASTER: MasterNode,
    NodeTypes.STORAGE: StorageNode,
    NodeTypes.CLIENT: ClientNode,
    NodeTypes.ADMIN: AdminNode,
236
}
237
NODE_CLASS_MAPPING = {
238 239 240 241
    StorageNode: NodeTypes.STORAGE,
    MasterNode: NodeTypes.MASTER,
    ClientNode: NodeTypes.CLIENT,
    AdminNode: NodeTypes.ADMIN,
242
}
243

Yoshinori Okuji's avatar
Yoshinori Okuji committed
244 245 246
class NodeManager(object):
    """This class manages node status."""

247 248 249 250
    # TODO: rework getXXXList() methods, filter first by node type
    # - getStorageList(identified=True, connected=True, )
    # - getList(...)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
251
    def __init__(self):
252 253 254 255 256
        self._node_set = set()
        self._address_dict = {}
        self._uuid_dict = {}
        self._type_dict = {}
        self._state_dict = {}
257
        self._identified_dict = {}
258 259

    def add(self, node):
260
        if node in self._node_set:
261
            return
262
        self._node_set.add(node)
263 264 265 266
        self._updateAddress(node, None)
        self._updateUUID(node, None)
        self.__updateSet(self._type_dict, None, node.__class__, node)
        self.__updateSet(self._state_dict, None, node.getState(), node)
267
        self._updateIdentified(node)
268

269 270
    def remove(self, node):
        if node is None or node not in self._node_set:
271
            return
272 273 274 275 276
        self._node_set.remove(node)
        self.__drop(self._address_dict, node.getAddress())
        self.__drop(self._uuid_dict, node.getUUID())
        self.__dropSet(self._state_dict, node.getState(), node)
        self.__dropSet(self._type_dict, node.__class__, node)
277
        self._updateIdentified(node)
278

279
    def __drop(self, index_dict, key):
280
        try:
281
            del index_dict[key]
282
        except KeyError:
283 284 285
            # a node may have not be indexed by uuid or address, eg.:
            # - a master known by address but without UUID
            # - a client or admin node that don't have listening address
286 287
            pass

288 289 290 291 292 293
    def __update(self, index_dict, old_key, new_key, node):
        """ Update an index from old to new key """
        if old_key is not None:
            del index_dict[old_key]
        if new_key is not None:
            index_dict[new_key] = node
294

295 296 297 298 299 300 301 302 303 304
    def _updateIdentified(self, node):
        uuid = node.getUUID()
        if node.isIdentified():
            self._identified_dict[uuid] = node
        else:
            try:
                del self._identified_dict[uuid]
            except KeyError:
                pass

305 306
    def _updateAddress(self, node, old_address):
        self.__update(self._address_dict, old_address, node.getAddress(), node)
307

308 309
    def _updateUUID(self, node, old_uuid):
        self.__update(self._uuid_dict, old_uuid, node.getUUID(), node)
310

311 312 313
    def __dropSet(self, set_dict, key, node):
        if key in set_dict and node in set_dict[key]:
            set_dict[key].remove(node)
314

315 316 317 318 319 320
    def __updateSet(self, set_dict, old_key, new_key, node):
        """ Update a set index from old to new key """
        if old_key in set_dict and node in set_dict[old_key]:
            set_dict[old_key].remove(node)
        if new_key is not None:
            set_dict.setdefault(new_key, set()).add(node)
321

322 323
    def _updateState(self, node, old_state):
        self.__updateSet(self._state_dict, old_state, node.getState(), node)
324

325 326 327 328 329
    def getList(self, node_filter=None):
        if filter is None:
            return list(self._node_set)
        return filter(node_filter, self._node_set)

330 331 332
    def getIdentifiedList(self, pool_set=None):
        """
            Returns a generator to iterate over identified nodes
333
            pool_set is an iterable of UUIDs allowed
334
        """
335 336 337 338
        if pool_set is not None:
            identified_nodes = self._identified_dict.items()
            return [v for k, v in identified_nodes if k in pool_set]
        return list(self._identified_dict.values())
339 340 341 342 343 344 345 346

    def getConnectedList(self):
        """
            Returns a generator to iterate over connected nodes
        """
        # TODO: use an index
        return [x for x in self._node_set if x.isConnected()]

347
    def __getList(self, index_dict, key):
348
        return index_dict.setdefault(key, set())
349 350 351

    def getByStateList(self, state):
        """ Get a node list filtered per the node state """
352
        return list(self.__getList(self._state_dict, state))
353

354 355 356 357 358
    def __getTypeList(self, type_klass, only_identified=False):
        node_set = self.__getList(self._type_dict, type_klass)
        if only_identified:
            return [x for x in node_set if x.getUUID() in self._identified_dict]
        return list(node_set)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
359

360
    def getMasterList(self, only_identified=False):
361
        """ Return a list with master nodes """
362
        return self.__getTypeList(MasterNode, only_identified)
363

364
    def getStorageList(self, only_identified=False):
365
        """ Return a list with storage nodes """
366
        return self.__getTypeList(StorageNode, only_identified)
367

368
    def getClientList(self, only_identified=False):
369
        """ Return a list with client nodes """
370
        return self.__getTypeList(ClientNode, only_identified)
371

372
    def getAdminList(self, only_identified=False):
373
        """ Return a list with admin nodes """
374
        return self.__getTypeList(AdminNode, only_identified)
375 376 377 378 379 380 381 382 383 384

    def getByAddress(self, address):
        """ Return the node that match with a given address """
        return self._address_dict.get(address, None)

    def getByUUID(self, uuid):
        """ Return the node that match with a given UUID """
        return self._uuid_dict.get(uuid, None)

    def hasAddress(self, address):
385
        return address in self._address_dict
386 387

    def hasUUID(self, uuid):
388
        return uuid in self._uuid_dict
389

390 391
    def _createNode(self, klass, **kw):
        node = klass(self, **kw)
392 393 394
        self.add(node)
        return node

395
    def createMaster(self, **kw):
396
        """ Create and register a new master """
397
        return self._createNode(MasterNode, **kw)
398

399
    def createStorage(self, **kw):
400
        """ Create and register a new storage """
401
        return self._createNode(StorageNode, **kw)
402

403
    def createClient(self, **kw):
404
        """ Create and register a new client """
405
        return self._createNode(ClientNode, **kw)
406

407
    def createAdmin(self, **kw):
408
        """ Create and register a new admin """
409
        return self._createNode(AdminNode, **kw)
410

411
    def _getClassFromNodeType(self, node_type):
412
        klass = NODE_TYPE_MAPPING.get(node_type)
413 414
        if klass is None:
            raise RuntimeError('Unknown node type : %s' % node_type)
415 416 417 418
        return klass

    def createFromNodeType(self, node_type, **kw):
        return self._createNode(self._getClassFromNodeType(node_type), **kw)
419

420
    def init(self):
421 422 423 424 425
        self._node_set.clear()
        self._type_dict.clear()
        self._state_dict.clear()
        self._uuid_dict.clear()
        self._address_dict.clear()
426

427 428 429
    def update(self, node_list):
        for node_type, addr, uuid, state in node_list:
            # lookup in current table
430 431
            node_by_uuid = self.getByUUID(uuid)
            node_by_addr = self.getByAddress(addr)
432 433 434
            node = node_by_uuid or node_by_addr

            log_args = (node_type, dump(uuid), addr, state)
435
            if state == NodeStates.DOWN:
436 437 438 439
                # drop down nodes
                logging.debug('drop node %s %s %s %s' % log_args)
                self.remove(node)
            elif node_by_uuid is not None:
440
                if node.getAddress() != addr:
441
                    # address changed, update it
442
                    node.setAddress(addr)
443 444 445 446 447 448 449 450 451 452
                logging.debug('update node %s %s %s %s' % log_args)
                node.setState(state)
            else:
                if node_by_addr is not None:
                    # exists only by address,
                    self.remove(node)
                # don't exists, add it
                klass = NODE_TYPE_MAPPING.get(node_type, None)
                if klass is None:
                    raise RuntimeError('Unknown node type')
453
                node = klass(self, address=addr, uuid=uuid)
454 455
                node.setState(state)
                self.add(node)
456
                logging.debug('create node %s %s %s %s' % log_args)
457
        self.log()
458

459
    def log(self):
460
        logging.debug('Node manager : %d nodes' % len(self._node_set))
461 462 463 464 465
        for node in sorted(list(self._node_set)):
            uuid = dump(node.getUUID()) or '-' * 32
            address = node.getAddress() or ''
            if address:
                address = '%s:%d' % address
466
            logging.debug(' * %32s | %8s | %22s | %s' % (
467 468
                uuid, node.getType(), address, node.getState()))