connection.py 19.5 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18 19
from time import time

20
from neo import logging
21
from neo.locking import RLock
Yoshinori Okuji's avatar
Yoshinori Okuji committed
22

23
from neo.protocol import PacketMalformedError, Packets
24
from neo.connector import ConnectorException, ConnectorTryAgainException, \
25 26
        ConnectorInProgressException, ConnectorConnectionRefusedException, \
        ConnectorConnectionClosedException
27
from neo.util import dump
Grégory Wisniewski's avatar
Grégory Wisniewski committed
28
from neo.logger import PACKET_LOGGER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
29

30
from neo import attributeTracker
31
from neo.util import ReadBuffer
32
from neo.profiling import profiler_decorator
33

34 35 36 37 38
PING_DELAY = 5
PING_TIMEOUT = 5
INCOMING_TIMEOUT = 10
CRITICAL_TIMEOUT = 30

39 40
APPLY_HANDLER = object()

41 42
def not_closed(func):
    def decorator(self, *args, **kw):
43
        if self.connector is None:
44
            raise ConnectorConnectionClosedException
45
        return func(self, *args, **kw)
46
    return decorator
47

48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
def lockCheckWrapper(func):
    """
    This function is to be used as a wrapper around
    MT(Client|Server)Connection class methods.

    It uses a "_" method on RLock class, so it might stop working without
    notice (sadly, RLock does not offer any "acquired" method, but that one
    will do as it checks that current thread holds this lock).

    It requires moniroted class to have an RLock instance in self._lock
    property.
    """
    def wrapper(self, *args, **kw):
        if not self._lock._is_owned():
            import traceback
64 65 66
            logging.warning('%s called on %s instance without being locked.' \
                ' Stack:\n%s', func.func_code.co_name, self.__class__.__name__,
                ''.join(traceback.format_stack()))
67 68 69 70
        # Call anyway
        return func(self, *args, **kw)
    return wrapper

71

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
class HandlerSwitcher(object):

    def __init__(self, connection, handler):
        self._connection = connection
        # pending handlers and related requests
        self._pending = [[{}, handler]]

    def clear(self):
        handler = self._pending[0][1]
        self._pending = [[{}, handler]]

    def isPending(self):
        return self._pending[0][0]

    def getHandler(self):
        return self._pending[0][1]

    def emit(self, request):
        # register the request in the current handler
        assert len(self._pending) == 1 or self._pending[0][0]
        (request_dict, _) = self._pending[-1]
        msg_id = request.getId()
        assert request.getAnswerClass() is not None, "Not a request"
        assert msg_id not in request_dict, "Packet id already expected"
        request_dict[msg_id] = request.getAnswerClass()

    def handle(self, packet):
        assert len(self._pending) == 1 or self._pending[0][0]
        PACKET_LOGGER.dispatch(self._connection, packet, 'from')
        msg_id = packet.getId()
        (request_dict, handler) = self._pending[0]
        # notifications are not expected
        if not packet.isResponse():
            handler.packetReceived(self._connection, packet)
            return
        # checkout the expected answer class
        klass = request_dict.pop(msg_id, None)
        if klass and isinstance(packet, klass) or packet.isError():
            handler.packetReceived(self._connection, packet)
            # apply a pending handler if no more answers are pending
            if len(self._pending) > 1 and not request_dict:
                del self._pending[0]
                logging.debug('Apply handler %r', self._pending[0][1])
        else:
            logging.error('Unexpected answer: %r', packet)
117 118
            notification = Packets.Notify('Unexpected answer: %r' % packet)
            self._connection.notify(notification)
119 120 121 122 123 124 125 126 127 128 129 130 131 132
            self._connection.abort()
            handler.peerBroken()

    def setHandler(self, handler):
        if len(self._pending) == 1 and not self._pending[0][0]:
            # nothing is pending, change immediately
            logging.debug('Set handler %r', handler)
            self._pending[0][1] = handler
        else:
            # put the next handler in queue
            logging.debug('Delay handler %r', handler)
            self._pending.append([{}, handler])


133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
class Timeout(object):
    """ Keep track of current timeouts """

    def __init__(self):
        self._ping_time = None
        self._critical_time = None

    def update(self, t, timeout=CRITICAL_TIMEOUT):
        """ Update the new critical time """
        self._ping_time = t + PING_TIMEOUT
        critical_time = self._ping_time + timeout
        self._critical_time = max(critical_time, self._critical_time)

    def refresh(self, t):
        """ Refresh timeout after something received """
        self._ping_time = t + PING_DELAY

    def softExpired(self, t):
        """ Indicate if the soft timeout (ping delay) is reached """
        # hard timeout takes precedences
        return self._ping_time < t < self._critical_time

    def hardExpired(self, t):
        """ Indicate if hard (or pong) timeout is reached """
        # should be called if softExpired if False
        return self._critical_time < t or self._ping_time < t


Yoshinori Okuji's avatar
Yoshinori Okuji committed
161 162
class BaseConnection(object):
    """A base connection."""
163

164 165
    def __init__(self, event_manager, handler, connector, addr=None):
        assert connector is not None, "Need a low-level connector"
Yoshinori Okuji's avatar
Yoshinori Okuji committed
166
        self.em = event_manager
167
        self.connector = connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
168
        self.addr = addr
169
        self._handlers = HandlerSwitcher(self, handler)
170
        self._timeout = Timeout()
171
        event_manager.register(self)
172

173 174 175 176 177 178 179 180
    def checkTimeout(self, t):
        if self._handlers.isPending():
            if self._timeout.softExpired(t):
                self._timeout.refresh(t)
                self.ping()
            elif self._timeout.hardExpired(t):
                # critical time reach or pong not received, abort
                logging.info('timeout with %s:%d', *(self.getAddress()))
181 182
                self.notify(Packets.Notify('Timeout'))
                self.abort()
183 184
                self.getHandler().timeoutExpired(self)

Aurel's avatar
Aurel committed
185 186 187 188 189 190
    def lock(self):
        return 1

    def unlock(self):
        return None

191 192
    def getConnector(self):
        return self.connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
193

Yoshinori Okuji's avatar
Yoshinori Okuji committed
194 195
    def getAddress(self):
        return self.addr
Yoshinori Okuji's avatar
Yoshinori Okuji committed
196

Yoshinori Okuji's avatar
Yoshinori Okuji committed
197 198 199 200 201 202
    def readable(self):
        raise NotImplementedError

    def writable(self):
        raise NotImplementedError

203 204 205
    def close(self):
        """Close the connection."""
        if self.connector is not None:
206
            em = self.em
207 208
            em.removeReader(self)
            em.removeWriter(self)
209
            em.unregister(self)
210 211 212 213 214 215
            self.connector.shutdown()
            self.connector.close()
            self.connector = None

    __del__ = close

Yoshinori Okuji's avatar
Yoshinori Okuji committed
216
    def getHandler(self):
217
        return self._handlers.getHandler()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
218

219
    def setHandler(self, handler):
220
        self._handlers.setHandler(handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
221 222 223 224

    def getEventManager(self):
        return self.em

225 226 227
    def getUUID(self):
        return None

228 229 230
    def isAborted(self):
        return False

231
    def isListening(self):
232 233
        return False

234
    def isServer(self):
235 236
        return False

237
    def isClient(self):
238
        return False
239

240 241 242
    def hasPendingMessages(self):
        return False

243 244 245 246 247 248 249 250
    def whoSetConnector(self):
        """
          Debugging method: call this method to know who set the current
          connector value.
        """
        return attributeTracker.whoSet(self, 'connector')

attributeTracker.track(BaseConnection)
251

Yoshinori Okuji's avatar
Yoshinori Okuji committed
252 253
class ListeningConnection(BaseConnection):
    """A listen connection."""
254

255
    def __init__(self, event_manager, handler, addr, connector, **kw):
256
        logging.debug('listening to %s:%d', *addr)
257
        BaseConnection.__init__(self, event_manager, handler,
258 259
                                addr=addr, connector=connector)
        self.connector.makeListeningConnection(addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
260 261 262 263
        self.em.addReader(self)

    def readable(self):
        try:
264
            new_s, addr = self.connector.getNewConnection()
265
            logging.debug('accepted a connection from %s:%d', *addr)
266 267
            handler = self.getHandler()
            new_conn = ServerConnection(self.getEventManager(), handler,
268
                connector=new_s, addr=addr)
269 270
            # A request for a node identification should arrive.
            self._timeout.update(time(), timeout=INCOMING_TIMEOUT)
271
            handler.connectionAccepted(new_conn)
272 273
        except ConnectorTryAgainException:
            pass
274

Grégory Wisniewski's avatar
Grégory Wisniewski committed
275 276 277
    def writable(self):
        return False

278
    def isListening(self):
279 280
        return True

281

Yoshinori Okuji's avatar
Yoshinori Okuji committed
282 283
class Connection(BaseConnection):
    """A connection."""
284

285 286 287
    def __init__(self, event_manager, handler, connector, addr=None):
        BaseConnection.__init__(self, event_manager, handler,
                                connector=connector, addr=addr)
288
        self.read_buf = ReadBuffer()
289
        self.write_buf = []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
290
        self.cur_id = 0
291
        self.peer_id = 0
Yoshinori Okuji's avatar
Yoshinori Okuji committed
292 293
        self.aborted = False
        self.uuid = None
294
        self._queue = []
295
        self._on_close = None
296
        event_manager.addReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
297

298 299 300 301
    def setOnClose(self, callback):
        assert self._on_close is None
        self._on_close = callback

302 303 304
    def isAborted(self):
        return self.aborted

Yoshinori Okuji's avatar
Yoshinori Okuji committed
305 306 307 308 309
    def getUUID(self):
        return self.uuid

    def setUUID(self, uuid):
        self.uuid = uuid
Yoshinori Okuji's avatar
Yoshinori Okuji committed
310

311 312 313 314 315 316
    def setPeerId(self, peer_id):
        self.peer_id = peer_id

    def getPeerId(self):
        return self.peer_id

317
    @profiler_decorator
Grégory Wisniewski's avatar
Grégory Wisniewski committed
318
    def _getNextId(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
319
        next_id = self.cur_id
320
        self.cur_id = (next_id + 1) & 0xffffffff
Yoshinori Okuji's avatar
Yoshinori Okuji committed
321 322 323
        return next_id

    def close(self):
324
        logging.debug('closing a connector for %s (%s:%d)',
325 326
                dump(self.uuid), *(self.addr))
        BaseConnection.close(self)
327 328 329
        if self._on_close is not None:
            self._on_close()
            self._on_close = None
330
        del self.write_buf[:]
331
        self.read_buf.clear()
332
        self._handlers.clear()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
333 334 335

    def abort(self):
        """Abort dealing with this connection."""
336
        logging.debug('aborting a connector for %s (%s:%d)',
337
                dump(self.uuid), *(self.addr))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
338 339 340 341
        self.aborted = True

    def writable(self):
        """Called when self is writable."""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
342
        self._send()
343
        if not self.write_buf and self.connector is not None:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
344 345 346
            if self.aborted:
                self.close()
            else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
347
                self.em.removeWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
348 349 350

    def readable(self):
        """Called when self is readable."""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
351
        self._recv()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
352 353 354
        self.analyse()

        if self.aborted:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
355
            self.em.removeReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
356 357 358

    def analyse(self):
        """Analyse received data."""
359
        while True:
360
            # parse a packet
361
            try:
362
                packet = Packets.parse(self.read_buf)
363 364
                if packet is None:
                    break
365
            except PacketMalformedError, msg:
366
                self.getHandler()._packetMalformed(self, msg)
367
                return
368
            self._timeout.refresh(time())
Vincent Pelletier's avatar
Vincent Pelletier committed
369
            packet_type = packet.getType()
370 371 372 373
            if packet_type == Packets.Ping:
                # Send a pong notification
                self.answer(Packets.Pong(), packet.getId())
            elif packet_type != Packets.Pong:
374
                # Skip PONG packets, its only purpose is refresh the timeout
375 376
                # generated upong ping.
                self._queue.append(packet)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
377

378 379 380 381 382 383 384 385 386 387
    def hasPendingMessages(self):
        """
          Returns True if there are messages queued and awaiting processing.
        """
        return len(self._queue) != 0

    def process(self):
        """
          Process a pending packet.
        """
388
        # check out packet and process it with current handler
389
        packet = self._queue.pop(0)
390
        self._handlers.handle(packet)
391

Yoshinori Okuji's avatar
Yoshinori Okuji committed
392
    def pending(self):
393
        return self.connector is not None and self.write_buf
Yoshinori Okuji's avatar
Yoshinori Okuji committed
394

395
    def _closure(self, was_connected=False):
396
        assert self.connector is not None, self.whoSetConnector()
397
        handler = self.getHandler()
398
        self.close()
399 400 401 402
        if was_connected:
            handler.connectionFailed(self)
        else:
            handler.connectionClosed(self)
403

404
    @profiler_decorator
Grégory Wisniewski's avatar
Grégory Wisniewski committed
405
    def _recv(self):
406
        """Receive data from a connector."""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
407
        try:
408 409
            data = self.connector.receive()
            if not data:
410
                logging.debug('Connection %r closed in recv', self.connector)
411
                self._closure()
412
                return
413
            self.read_buf.append(data)
414
        except ConnectorTryAgainException:
415
            pass
416 417
        except ConnectorConnectionRefusedException:
            # should only occur while connecting
418
            self._closure(was_connected=True)
419
        except ConnectorConnectionClosedException:
420
            # connection resetted by peer, according to the man, this error
421
            # should not occurs but it seems it's false
422
            logging.debug('Connection reset by peer: %r', self.connector)
423
            self._closure()
424
        except ConnectorException:
425
            logging.debug('Unknown connection error: %r', self.connector)
426
            self._closure()
427 428
            # unhandled connector exception
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
429

430
    @profiler_decorator
Grégory Wisniewski's avatar
Grégory Wisniewski committed
431
    def _send(self):
432
        """Send data to a connector."""
433 434 435
        if not self.write_buf:
            return
        try:
436 437
            msg = ''.join(self.write_buf)
            n = self.connector.send(msg)
438
            if not n:
439
                logging.debug('Connection %r closed in send', self.connector)
440
                self._closure()
441
                return
442 443 444 445
            if n == len(msg):
                del self.write_buf[:]
            else:
                self.write_buf = [msg[n:]]
446 447
        except ConnectorTryAgainException:
            pass
448 449
        except ConnectorConnectionClosedException:
            # connection resetted by peer
450
            logging.debug('Connection reset by peer: %r', self.connector)
451
            self._closure()
452
        except ConnectorException:
453
            logging.debug('Unknown connection error: %r', self.connector)
454
            # unhandled connector exception
455
            self._closure()
456
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
457

458
    @profiler_decorator
Grégory Wisniewski's avatar
Grégory Wisniewski committed
459
    def _addPacket(self, packet):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
460
        """Add a packet into the write buffer."""
461
        if self.connector is None:
462 463
            return

464 465
        was_empty = not bool(self.write_buf)

466
        PACKET_LOGGER.dispatch(self, packet, ' to ')
467
        self.write_buf.extend(packet.encode())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
468

469 470
        if was_empty:
            # enable polling for writing.
471
            self.em.addWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
472

473
    @not_closed
474
    def notify(self, packet):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
475
        """ Then a packet with a new ID """
476
        msg_id = self._getNextId()
477
        packet.setId(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
478
        self._addPacket(packet)
479 480
        return msg_id

481
    @profiler_decorator
482
    @not_closed
483
    def ask(self, packet, timeout=CRITICAL_TIMEOUT):
484 485
        """
        Send a packet with a new ID and register the expectation of an answer
486
        """
Grégory Wisniewski's avatar
Grégory Wisniewski committed
487
        msg_id = self._getNextId()
488
        packet.setId(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
489
        self._addPacket(packet)
490 491
        if not self._handlers.isPending():
            self._timeout.update(time(), timeout=timeout)
492
        self._handlers.emit(packet)
493 494
        return msg_id

495
    @not_closed
496
    def answer(self, packet, msg_id=None):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
497
        """ Answer to a packet by re-using its ID for the packet answer """
498 499
        if msg_id is None:
            msg_id = self.getPeerId()
500
        packet.setId(msg_id)
501
        assert packet.isResponse(), packet
Grégory Wisniewski's avatar
Grégory Wisniewski committed
502
        self._addPacket(packet)
503

504 505
    @not_closed
    def ping(self):
506
        packet = Packets.Ping()
507
        packet.setId(self._getNextId())
508 509
        self._addPacket(packet)

510

Yoshinori Okuji's avatar
Yoshinori Okuji committed
511 512
class ClientConnection(Connection):
    """A connection from this node to a remote node."""
513

514
    def __init__(self, event_manager, handler, addr, connector, **kw):
515
        self.connecting = True
516 517
        Connection.__init__(self, event_manager, handler, addr=addr,
                            connector=connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
518
        handler.connectionStarted(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
519
        try:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
520
            try:
521
                self.connector.makeClientConnection(addr)
522 523
            except ConnectorInProgressException:
                event_manager.addWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
524
            else:
525
                self.connecting = False
526
                self.getHandler().connectionCompleted(self)
527
        except ConnectorConnectionRefusedException:
528
            self._closure(was_connected=True)
529
        except ConnectorException:
530
            # unhandled connector exception
531
            self._closure(was_connected=True)
532
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
533

Yoshinori Okuji's avatar
Yoshinori Okuji committed
534 535 536
    def writable(self):
        """Called when self is writable."""
        if self.connecting:
537
            err = self.connector.getError()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
538
            if err:
539
                self._closure(was_connected=True)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
540 541 542
                return
            else:
                self.connecting = False
543
                self.getHandler().connectionCompleted(self)
544
                self.em.addReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
545 546
        else:
            Connection.writable(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
547

548
    def isClient(self):
549 550
        return True

551

Yoshinori Okuji's avatar
Yoshinori Okuji committed
552 553
class ServerConnection(Connection):
    """A connection from a remote node to this node."""
554

555
    def isServer(self):
556
        return True
557

558

559 560
class MTClientConnection(ClientConnection):
    """A Multithread-safe version of ClientConnection."""
561

562
    def __init__(self, local_var, *args, **kwargs):
563 564
        # _lock is only here for lock debugging purposes. Do not use.
        self._lock = lock = RLock()
565
        self._local_var = local_var
566 567
        self.acquire = lock.acquire
        self.release = lock.release
568
        self.dispatcher = kwargs.pop('dispatcher')
569
        self.lock()
570 571 572 573
        try:
            super(MTClientConnection, self).__init__(*args, **kwargs)
        finally:
            self.unlock()
574 575

    def lock(self, blocking = 1):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
576
        return self.acquire(blocking = blocking)
577 578 579 580

    def unlock(self):
        self.release()

581 582 583 584 585 586 587 588 589 590 591 592
    @lockCheckWrapper
    def writable(self, *args, **kw):
        return super(MTClientConnection, self).writable(*args, **kw)

    @lockCheckWrapper
    def readable(self, *args, **kw):
        return super(MTClientConnection, self).readable(*args, **kw)

    @lockCheckWrapper
    def analyse(self, *args, **kw):
        return super(MTClientConnection, self).analyse(*args, **kw)

Grégory Wisniewski's avatar
Grégory Wisniewski committed
593
    def notify(self, *args, **kw):
594 595 596 597 598
        self.lock()
        try:
            return super(MTClientConnection, self).notify(*args, **kw)
        finally:
            self.unlock()
599

600
    def ask(self, packet, timeout=CRITICAL_TIMEOUT):
601 602 603 604 605 606 607 608 609 610 611 612
        self.lock()
        try:
            msg_id = self._getNextId()
            packet.setId(msg_id)
            self.dispatcher.register(self, msg_id, self._local_var.queue)
            self._addPacket(packet)
            if not self._handlers.isPending():
                self._timeout.update(time(), timeout=timeout)
            self._handlers.emit(packet)
            return msg_id
        finally:
            self.unlock()
613 614

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
615 616
    def answer(self, *args, **kw):
        return super(MTClientConnection, self).answer(*args, **kw)
617

618 619 620 621
    @lockCheckWrapper
    def checkTimeout(self, *args, **kw):
        return super(MTClientConnection, self).checkTimeout(*args, **kw)

622 623 624 625 626 627
    def close(self):
        self.lock()
        try:
            super(MTClientConnection, self).close()
        finally:
            self.release()
628