connection.py 19.1 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Yoshinori Okuji's avatar
Yoshinori Okuji committed
18
import logging
19
from neo.locking import RLock
Yoshinori Okuji's avatar
Yoshinori Okuji committed
20

21
from neo import protocol
22
from neo.protocol import PacketMalformedError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from neo.event import IdleEvent
24
from neo.connector import ConnectorException, ConnectorTryAgainException, \
25 26
        ConnectorInProgressException, ConnectorConnectionRefusedException, \
        ConnectorConnectionClosedException
27
from neo.util import dump
Yoshinori Okuji's avatar
Yoshinori Okuji committed
28

29 30
def not_closed(func):
    def decorator(self, *args, **kw):
31
        if self.connector is None:
32
            raise ConnectorConnectionClosedException
33
        return func(self, *args, **kw)
34
    return decorator
35

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
def lockCheckWrapper(func):
    """
    This function is to be used as a wrapper around
    MT(Client|Server)Connection class methods.

    It uses a "_" method on RLock class, so it might stop working without
    notice (sadly, RLock does not offer any "acquired" method, but that one
    will do as it checks that current thread holds this lock).

    It requires moniroted class to have an RLock instance in self._lock
    property.
    """
    def wrapper(self, *args, **kw):
        if not self._lock._is_owned():
            import traceback
            logging.warning('%s called on %s instance without being locked. Stack:\n%s', func.func_code.co_name, self.__class__.__name__, ''.join(traceback.format_stack()))
        # Call anyway
        return func(self, *args, **kw)
    return wrapper

Yoshinori Okuji's avatar
Yoshinori Okuji committed
56 57
class BaseConnection(object):
    """A base connection."""
58 59 60

    def __init__(self, event_manager, handler, connector = None,
                 addr = None, connector_handler = None):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
61
        self.em = event_manager
62
        self.connector = connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63 64
        self.addr = addr
        self.handler = handler
65 66
        if connector is not None:
            self.connector_handler = connector.__class__
Yoshinori Okuji's avatar
Yoshinori Okuji committed
67
            event_manager.register(self)
68 69 70
        else:            
            self.connector_handler = connector_handler
            
Aurel's avatar
Aurel committed
71 72 73 74 75 76
    def lock(self):
        return 1

    def unlock(self):
        return None

77 78
    def getConnector(self):
        return self.connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
79

80 81 82 83 84
    def setConnector(self, connector):
        if self.connector is not None:
            raise RuntimeError, 'cannot overwrite a connector in a connection'
        if connector is not None:
            self.connector = connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
85
            self.em.register(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
86

Yoshinori Okuji's avatar
Yoshinori Okuji committed
87 88
    def getAddress(self):
        return self.addr
Yoshinori Okuji's avatar
Yoshinori Okuji committed
89

Yoshinori Okuji's avatar
Yoshinori Okuji committed
90 91 92 93 94 95
    def readable(self):
        raise NotImplementedError

    def writable(self):
        raise NotImplementedError

96 97 98 99 100 101 102 103 104 105 106 107 108
    def close(self):
        """Close the connection."""
        em = self.em
        if self.connector is not None:
            em.removeReader(self)
            em.removeWriter(self)
            em.unregister(self)            
            self.connector.shutdown()
            self.connector.close()
            self.connector = None

    __del__ = close

Yoshinori Okuji's avatar
Yoshinori Okuji committed
109 110 111
    def getHandler(self):
        return self.handler

112
    def setHandler(self, handler):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
113 114 115 116 117
        self.handler = handler

    def getEventManager(self):
        return self.em

118 119 120
    def getUUID(self):
        return None

121 122 123
    def isListeningConnection(self):
        raise NotImplementedError

124 125 126
    def hasPendingMessages(self):
        return False

Yoshinori Okuji's avatar
Yoshinori Okuji committed
127 128
class ListeningConnection(BaseConnection):
    """A listen connection."""
129
    def __init__(self, event_manager, handler, addr, connector_handler, **kw):
130
        logging.debug('listening to %s:%d', *addr)
131 132 133 134 135 136
        BaseConnection.__init__(self, event_manager, handler,
                                addr = addr,
                                connector_handler = connector_handler)
        connector = self.connector_handler()
        connector.makeListeningConnection(addr)
        self.setConnector(connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
137 138 139 140
        self.em.addReader(self)

    def readable(self):
        try:
141
            new_s, addr = self.connector.getNewConnection()
142
            logging.debug('accepted a connection from %s:%d', *addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
143
            self.handler.connectionAccepted(self, new_s, addr)
144 145
        except ConnectorTryAgainException:
            pass
146

Grégory Wisniewski's avatar
Grégory Wisniewski committed
147 148 149
    def writable(self):
        return False

150 151 152
    def isListeningConnection(self):
        return True

Yoshinori Okuji's avatar
Yoshinori Okuji committed
153 154
class Connection(BaseConnection):
    """A connection."""
155 156 157
    def __init__(self, event_manager, handler,
                 connector = None, addr = None,
                 connector_handler = None):
158 159
        self.read_buf = ""
        self.write_buf = ""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
160 161
        self.cur_id = 0
        self.event_dict = {}
Yoshinori Okuji's avatar
Yoshinori Okuji committed
162 163
        self.aborted = False
        self.uuid = None
164
        self._queue = []
165 166 167 168
        BaseConnection.__init__(self, event_manager, handler,
                                connector = connector, addr = addr,
                                connector_handler = connector_handler)
        if connector is not None:
169
            event_manager.addReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
170

Yoshinori Okuji's avatar
Yoshinori Okuji committed
171 172 173 174 175
    def getUUID(self):
        return self.uuid

    def setUUID(self, uuid):
        self.uuid = uuid
Yoshinori Okuji's avatar
Yoshinori Okuji committed
176

Grégory Wisniewski's avatar
Grégory Wisniewski committed
177
    def _getNextId(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
178
        next_id = self.cur_id
Yoshinori Okuji's avatar
Yoshinori Okuji committed
179 180
        # Deal with an overflow.
        if self.cur_id == 0xffffffff:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
181
            self.cur_id = 0
Yoshinori Okuji's avatar
Yoshinori Okuji committed
182 183
        else:
            self.cur_id += 1
Yoshinori Okuji's avatar
Yoshinori Okuji committed
184 185 186
        return next_id

    def close(self):
187 188 189 190
        logging.debug('closing a connector for %s (%s:%d)', 
                dump(self.uuid), *(self.addr))
        BaseConnection.close(self)
        for event in self.event_dict.itervalues():
191
            self.em.removeIdleEvent(event)
192
        self.event_dict.clear()
193 194
        self.write_buf = ""
        self.read_buf = ""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
195 196 197

    def abort(self):
        """Abort dealing with this connection."""
198 199
        logging.debug('aborting a connector for %s (%s:%d)', 
                dump(self.uuid), *(self.addr))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
200 201 202 203
        self.aborted = True

    def writable(self):
        """Called when self is writable."""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
204
        self._send()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
205 206 207 208
        if not self.pending():
            if self.aborted:
                self.close()
            else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
209
                self.em.removeWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
210 211 212

    def readable(self):
        """Called when self is readable."""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
213
        self._recv()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
214 215 216
        self.analyse()

        if self.aborted:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
217
            self.em.removeReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
218 219 220

    def analyse(self):
        """Analyse received data."""
221
        while 1:
222
            packet = None
223
            try:
224
                packet = protocol.parse(self.read_buf)
225
            except PacketMalformedError, msg:
226
                self.handler.packetMalformed(self, packet, msg)
227
                return
Yoshinori Okuji's avatar
Yoshinori Okuji committed
228

229 230 231 232 233
            if packet is None:
                break

            # Remove idle events, if appropriate packets were received.
            for msg_id in (None, packet.getId()):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
234
                try:
235 236 237 238 239
                    event = self.event_dict[msg_id]
                    del self.event_dict[msg_id]
                    self.em.removeIdleEvent(event)
                except KeyError:
                    pass
Yoshinori Okuji's avatar
Yoshinori Okuji committed
240

241
            try:
242 243 244 245 246 247 248 249
                packet_type = packet.getType()
                if packet_type == protocol.PING:
                    # Send a pong notification
                    self.answer(protocol.pong(), packet)
                elif packet_type != protocol.PONG:
                    # Skip PONG packets, its only purpose is to drop IdleEvent
                    # generated upong ping.
                    self._queue.append(packet)
250 251
            finally:
                self.read_buf = self.read_buf[len(packet):]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
    def hasPendingMessages(self):
        """
          Returns True if there are messages queued and awaiting processing.
        """
        return len(self._queue) != 0

    def _enqueue(self, packet):
        """
          Enqueue a parsed packet for future processing.
        """
        self._queue.append(packet)

    def _dequeue(self):
        """
          Dequeue a packet for processing.
        """
        return self._queue.pop(0)

    def process(self):
        """
          Process a pending packet.
        """
275
        packet = self._dequeue()
276
        logging.debug('#0x%08x %-30s from %s (%s:%d)', packet.getId(), 
277
                packet.getType(), dump(self.uuid), *self.getAddress())
278
        self.handler.packetReceived(self, packet)
279

Yoshinori Okuji's avatar
Yoshinori Okuji committed
280
    def pending(self):
281
        return self.connector is not None and self.write_buf
Yoshinori Okuji's avatar
Yoshinori Okuji committed
282

Grégory Wisniewski's avatar
Grégory Wisniewski committed
283
    def _recv(self):
284
        """Receive data from a connector."""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
285
        try:
286 287
            data = self.connector.receive()
            if not data:
288
                logging.debug('Connection %r closed in recv', self.connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
289
                self.close()
290
                self.handler.connectionClosed(self)
291 292
                return
            self.read_buf += data
293
        except ConnectorTryAgainException:        
294
            pass
295 296 297
        except ConnectorConnectionRefusedException:
            # should only occur while connecting
            self.close()
298
            self.handler.connectionFailed(self)
299 300 301
        except ConnectorConnectionClosedException:
            # connection resetted by peer, according to the man, this error 
            # should not occurs but it seems it's false
302
            logging.debug('Connection reset by peer: %r', self.connector)
303
            self.close()
304
            self.handler.connectionClosed(self)
305
        except ConnectorException:
306
            logging.debug('Unknown connection error: %r', self.connector)
307
            self.close()
308
            self.handler.connectionClosed(self)
309 310
            # unhandled connector exception
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
311

Grégory Wisniewski's avatar
Grégory Wisniewski committed
312
    def _send(self):
313
        """Send data to a connector."""
314 315 316 317 318
        if not self.write_buf:
            return
        try:
            n = self.connector.send(self.write_buf)
            if not n:
319
                logging.debug('Connection %r closed in send', self.connector)
320 321
                self.handler.connectionClosed(self)
                self.close()
322 323 324 325
                return
            self.write_buf = self.write_buf[n:]
        except ConnectorTryAgainException:
            pass
326 327
        except ConnectorConnectionClosedException:
            # connection resetted by peer
328
            logging.debug('Connection reset by peer: %r', self.connector)
329
            self.close()
330
            self.handler.connectionClosed(self)
331
        except ConnectorException:
332
            logging.debug('Unknown connection error: %r', self.connector)
333
            # unhandled connector exception
334
            self.close()
335
            self.handler.connectionClosed(self)
336
            raise 
Yoshinori Okuji's avatar
Yoshinori Okuji committed
337

Grégory Wisniewski's avatar
Grégory Wisniewski committed
338
    def _addPacket(self, packet):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
339
        """Add a packet into the write buffer."""
340
        if self.connector is None:
341 342
            return

343
        logging.debug('#0x%08x %-30s  to  %s (%s:%d)', packet.getId(),
344
                packet.getType(), dump(self.uuid), *self.getAddress())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
345
        try:
346
            self.write_buf += packet.encode()
347
        except PacketMalformedError, m:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
348
            logging.critical('trying to send a too big message')
349 350 351
            # XXX: we should assert that the internalError packet has a size
            # lower than MAX_PACKET_SIZE
            return self.notify(protocol.internalError(m))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
352 353

        # If this is the first time, enable polling for writing.
354
        if self.write_buf:
355
            self.em.addWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
356

Yoshinori Okuji's avatar
Yoshinori Okuji committed
357
    def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
358
        """Expect a message for a reply to a given message ID or any message.
Aurel's avatar
Aurel committed
359

Yoshinori Okuji's avatar
Yoshinori Okuji committed
360 361 362 363 364 365 366 367 368 369
        The purpose of this method is to define how much amount of time is
        acceptable to wait for a message, thus to detect a down or broken
        peer. This is important, because one error may halt a whole cluster
        otherwise. Although TCP defines a keep-alive feature, the timeout
        is too long generally, and it does not detect a certain type of reply,
        thus it is better to probe problems at the application level.

        The message ID specifies what ID is expected. Usually, this should
        be identical with an ID for a request message. If it is None, any
        message is acceptable, so it can be used to check idle time.
Aurel's avatar
Aurel committed
370

Yoshinori Okuji's avatar
Yoshinori Okuji committed
371 372
        The timeout is the amount of time to wait until keep-alive messages start.
        Once the timeout is expired, the connection starts to ping the peer.
Aurel's avatar
Aurel committed
373

Yoshinori Okuji's avatar
Yoshinori Okuji committed
374 375 376
        The additional timeout defines the amount of time after the timeout
        to invoke a timeoutExpired callback. If it is zero, no ping is sent, and
        the callback is executed immediately."""
377
        if self.connector is None:
378 379
            return

Yoshinori Okuji's avatar
Yoshinori Okuji committed
380 381
        event = IdleEvent(self, msg_id, timeout, additional_timeout)
        self.event_dict[msg_id] = event
Yoshinori Okuji's avatar
Yoshinori Okuji committed
382 383
        self.em.addIdleEvent(event)

384
    @not_closed
Grégory Wisniewski's avatar
Grégory Wisniewski committed
385 386 387 388
    def notify(self, packet, msg_id=None):
        """ Then a packet with a new ID """
        if msg_id is None:
            msg_id = self._getNextId()
389
        packet.setId(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
390
        self._addPacket(packet)
391 392
        return msg_id

393
    @not_closed
394
    def ask(self, packet, timeout=5, additional_timeout=30):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
395 396
        """ Send a packet with a new ID and register the expectation of an answer """
        msg_id = self._getNextId()
397 398
        packet.setId(msg_id)
        self.expectMessage(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
399
        self._addPacket(packet)
400 401
        return msg_id

402
    @not_closed
Grégory Wisniewski's avatar
Grégory Wisniewski committed
403 404 405
    def answer(self, packet, answered_packet):
        """ Answer to a packet by re-using its ID for the packet answer """
        msg_id = answered_packet.getId()
406
        packet.setId(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
407
        self._addPacket(packet)
408

409 410 411 412
    def ping(self, timeout=5):
        """ Send a ping and expect to receive a pong notification """
        packet = protocol.ping()
        msg_id = self._getNextId()
Vincent Pelletier's avatar
Vincent Pelletier committed
413
        packet.setId(msg_id)
414 415 416
        self.expectMessage(msg_id, timeout, 0)
        self._addPacket(packet)

417
    def isServerConnection(self):
418 419
        raise NotImplementedError

420 421 422
    def isListeningConnection(self):
        return False

Yoshinori Okuji's avatar
Yoshinori Okuji committed
423 424
class ClientConnection(Connection):
    """A connection from this node to a remote node."""
425
    def __init__(self, event_manager, handler, addr, connector_handler, **kw):
426
        self.connecting = True
427 428
        Connection.__init__(self, event_manager, handler, addr = addr,
                            connector_handler = connector_handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
429
        handler.connectionStarted(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
430
        try:
431 432
            connector = self.connector_handler()
            self.setConnector(connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
433
            try:
434 435 436
                connector.makeClientConnection(addr)
            except ConnectorInProgressException:
                event_manager.addWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
437
            else:
438
                self.connecting = False
Aurel's avatar
Aurel committed
439
                self.handler.connectionCompleted(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
440
                event_manager.addReader(self)
441 442 443
        except ConnectorConnectionRefusedException:
            handler.connectionFailed(self)
            self.close()
444
        except ConnectorException:
445
            # unhandled connector exception
Yoshinori Okuji's avatar
Yoshinori Okuji committed
446 447
            handler.connectionFailed(self)
            self.close()
448
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
449

Yoshinori Okuji's avatar
Yoshinori Okuji committed
450 451 452
    def writable(self):
        """Called when self is writable."""
        if self.connecting:
453
            err = self.connector.getError()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
454
            if err:
455
                self.handler.connectionFailed(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
456 457 458 459 460
                self.close()
                return
            else:
                self.connecting = False
                self.handler.connectionCompleted(self)
461
                self.em.addReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
462 463
        else:
            Connection.writable(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
464

465
    def isServerConnection(self):
466 467
        return False

Yoshinori Okuji's avatar
Yoshinori Okuji committed
468 469
class ServerConnection(Connection):
    """A connection from a remote node to this node."""
470
    def isServerConnection(self):
471
        return True
472 473 474

class MTClientConnection(ClientConnection):
    """A Multithread-safe version of ClientConnection."""
475
    def __init__(self, *args, **kwargs):
476 477
        # _lock is only here for lock debugging purposes. Do not use.
        self._lock = lock = RLock()
478 479
        self.acquire = lock.acquire
        self.release = lock.release
480
        self.dispatcher = kwargs.pop('dispatcher')
481
        self.lock()
482 483 484 485
        try:
            super(MTClientConnection, self).__init__(*args, **kwargs)
        finally:
            self.unlock()
486 487

    def lock(self, blocking = 1):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
488
        return self.acquire(blocking = blocking)
489 490 491 492

    def unlock(self):
        self.release()

493 494 495 496 497 498 499 500 501 502 503 504 505
    @lockCheckWrapper
    def writable(self, *args, **kw):
        return super(MTClientConnection, self).writable(*args, **kw)

    @lockCheckWrapper
    def readable(self, *args, **kw):
        return super(MTClientConnection, self).readable(*args, **kw)

    @lockCheckWrapper
    def analyse(self, *args, **kw):
        return super(MTClientConnection, self).analyse(*args, **kw)

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
506 507 508 509 510 511
    def expectMessage(self, *args, **kw):
        return super(MTClientConnection, self).expectMessage(*args, **kw)

    @lockCheckWrapper
    def notify(self, *args, **kw):
        return super(MTClientConnection, self).notify(*args, **kw)
512 513

    @lockCheckWrapper
514
    def ask(self, queue, packet, timeout=5, additional_timeout=30):
515 516
        msg_id = self._getNextId()
        packet.setId(msg_id)
517
        self.dispatcher.register(self, msg_id, queue)
518 519 520
        self.expectMessage(msg_id)
        self._addPacket(packet)
        return msg_id
521 522

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
523 524
    def answer(self, *args, **kw):
        return super(MTClientConnection, self).answer(*args, **kw)
525

526 527 528 529
    @lockCheckWrapper
    def close(self, *args, **kw):
        return super(MTClientConnection, self).close(*args, **kw)

530 531 532
class MTServerConnection(ServerConnection):
    """A Multithread-safe version of ServerConnection."""
    def __init__(self, *args, **kwargs):
533 534
        # _lock is only here for lock debugging purposes. Do not use.
        self._lock = lock = RLock()
535 536
        self.acquire = lock.acquire
        self.release = lock.release
537
        self.lock()
538 539 540 541
        try:
            super(MTServerConnection, self).__init__(*args, **kwargs)
        finally:
            self.unlock()
542 543 544 545 546 547

    def lock(self, blocking = 1):
        return self.acquire(blocking = blocking)

    def unlock(self):
        self.release()
548

549 550 551 552 553 554 555 556 557 558 559 560 561
    @lockCheckWrapper
    def writable(self, *args, **kw):
        return super(MTServerConnection, self).writable(*args, **kw)

    @lockCheckWrapper
    def readable(self, *args, **kw):
        return super(MTServerConnection, self).readable(*args, **kw)

    @lockCheckWrapper
    def analyse(self, *args, **kw):
        return super(MTServerConnection, self).analyse(*args, **kw)

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
562 563 564 565 566 567
    def expectMessage(self, *args, **kw):
        return super(MTServerConnection, self).expectMessage(*args, **kw)

    @lockCheckWrapper
    def notify(self, *args, **kw):
        return super(MTClientConnection, self).notify(*args, **kw)
568 569

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
570 571
    def ask(self, *args, **kw):
        return super(MTClientConnection, self).ask(*args, **kw)
572 573

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
574 575
    def answer(self, *args, **kw):
        return super(MTClientConnection, self).answer(*args, **kw)
576