app.py 44.6 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from cPickle import dumps, loads
19
from zlib import compress as real_compress, decompress
20
from neo.lib.locking import Empty
Julien Muchembled's avatar
Julien Muchembled committed
21
from random import shuffle
22
import time
23
import os
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.POSException import ReadConflictError
27
from ZODB.ConflictResolution import ResolvedSerial
28
from persistent.TimeStamp import TimeStamp
29

30
import neo.lib
31 32
from neo.lib.protocol import NodeTypes, Packets, \
    INVALID_PARTITION, ZERO_HASH, ZERO_TID
33 34 35 36 37 38
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum as real_makeChecksum, dump
from neo.lib.locking import Lock
from neo.lib.connection import MTClientConnection, OnTimeout, ConnectionClosed
from neo.lib.node import NodeManager
from neo.lib.connector import getConnectorHandler
39 40
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
41
from neo.lib.exception import NeoException
42
from .handlers import storage, master
43
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
44 45 46 47
from .poll import ThreadedPoll, psThreadedPoll
from .iterator import Iterator
from .cache import ClientCache
from .pool import ConnectionPool
48 49
from neo.lib.util import u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
50
from neo.lib.debug import register as registerLiveDebugger
51
from .container import ThreadContainer, TransactionContainer
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
67

68 69 70
CHECKED_SERIAL = object()


Aurel's avatar
Aurel committed
71
class Application(object):
72 73
    """The client node application."""

74 75
    def __init__(self, master_nodes, name, compress=True,
            dynamic_master_list=None, **kw):
76
        # Start polling thread
77
        self.em = EventManager()
78 79
        self.poll_thread = ThreadedPoll(self.em, name=name)
        psThreadedPoll()
80
        # Internal Attributes common to all thread
81
        self._db = None
Aurel's avatar
Aurel committed
82
        self.name = name
Olivier Cros's avatar
Olivier Cros committed
83 84
        master_addresses, connector_name = parseMasterList(master_nodes)
        self.connector_handler = getConnectorHandler(connector_name)
85
        self.dispatcher = Dispatcher(self.poll_thread)
86
        self.nm = NodeManager(dynamic_master_list)
87
        self.cp = ConnectionPool(self)
88
        self.pt = None
89
        self.master_conn = None
90
        self.primary_master_node = None
91
        self.trying_master_node = None
92 93

        # load master node list
Olivier Cros's avatar
Olivier Cros committed
94
        for address in master_addresses:
95
            self.nm.createMaster(address=address)
96

97
        # no self-assigned UUID, primary master will supply us one
98
        self.uuid = None
99
        self._cache = ClientCache()
100
        self.new_oid_list = []
101
        self.last_oid = '\0' * 8
102
        self.storage_event_handler = storage.StorageEventHandler(self)
103
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
104
        self.storage_handler = storage.StorageAnswersHandler(self)
105 106
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
107
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
108
        # Internal attribute distinct between thread
109 110
        self._thread_container = ThreadContainer()
        self._txn_container = TransactionContainer()
111
        # Lock definition :
112
        # _load_lock is used to make loading and storing atomic
113
        lock = Lock()
114 115
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
116 117
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
118 119
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
120
        self._oid_lock_release = lock.release
121
        lock = Lock()
122
        # _cache_lock is used for the client cache
123 124
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
125
        lock = Lock()
126 127
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
128 129
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
130 131 132 133
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
134
        self.compress = compress
135
        registerLiveDebugger(on_log=self.log)
136

137 138 139 140 141 142 143 144 145
    def getHandlerData(self):
        return self._thread_container.get()['answer']

    def setHandlerData(self, data):
        self._thread_container.get()['answer'] = data

    def _getThreadQueue(self):
        return self._thread_container.get()['queue']

146 147 148 149 150 151
    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()

152
    @profiler_decorator
153
    def _handlePacket(self, conn, packet, kw={}, handler=None):
154 155 156 157 158 159 160 161 162 163
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
164 165
            # Guess the handler to use based on the type of node on the
            # connection
166 167 168 169 170 171 172 173 174 175
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
176 177
        conn.lock()
        try:
178
            handler.dispatch(conn, packet, kw)
179 180
        finally:
            conn.unlock()
181

182
    @profiler_decorator
183
    def _waitAnyMessage(self, queue, block=True):
184 185 186 187 188 189
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
190 191
        pending = self.dispatcher.pending
        get = queue.get
192
        _handlePacket = self._handlePacket
193
        while pending(queue):
194
            try:
195
                conn, packet, kw = get(block)
196
            except Empty:
197
                break
198 199
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
200
                continue
201 202
            block = False
            try:
203
                _handlePacket(conn, packet, kw)
204 205 206
            except ConnectionClosed:
                pass

207 208 209 210 211 212 213 214 215 216 217 218 219
    def _waitAnyTransactionMessage(self, txn_context, block=True):
        """
        Just like _waitAnyMessage, but for per-transaction exchanges, rather
        than per-thread.
        """
        queue = txn_context['queue']
        self.setHandlerData(txn_context)
        try:
            self._waitAnyMessage(queue, block=block)
        finally:
            # Don't leave access to thread context, even if a raise happens.
            self.setHandlerData(None)

220
    @profiler_decorator
221 222 223 224 225
    def _ask(self, conn, packet, handler=None):
        self.setHandlerData(None)
        queue = self._getThreadQueue()
        msg_id = conn.ask(packet, queue=queue)
        get = queue.get
226 227
        _handlePacket = self._handlePacket
        while True:
228
            qconn, qpacket, kw = get(True)
229 230
            is_forgotten = isinstance(qpacket, ForgottenPacket)
            if conn is qconn:
231
                # check fake packet
232
                if qpacket is None:
233
                    raise ConnectionClosed
234
                if msg_id == qpacket.getId():
235 236 237
                    if is_forgotten:
                        raise ValueError, 'ForgottenPacket for an ' \
                            'explicitely expected packet.'
238
                    _handlePacket(qconn, qpacket, kw, handler)
239
                    break
240
            if not is_forgotten and qpacket is not None:
241
                _handlePacket(qconn, qpacket, kw)
242
        return self.getHandlerData()
243

244
    @profiler_decorator
245
    def _askStorage(self, conn, packet):
246
        """ Send a request to a storage node and process its answer """
247
        return self._ask(conn, packet, handler=self.storage_handler)
248

249
    @profiler_decorator
250
    def _askPrimary(self, packet):
251
        """ Send a request to the primary master and process its answer """
252 253
        return self._ask(self._getMasterConnection(), packet,
            handler=self.primary_handler)
254

255
    @profiler_decorator
256 257
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
258
        # acquire the lock to allow only one thread to connect to the primary
259 260 261 262
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
263
                self.new_oid_list = []
264 265 266 267 268
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
269

270
    def getPartitionTable(self):
271
        """ Return the partition table manager, reconnect the PMN if needed """
272 273 274 275 276
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

277
    @profiler_decorator
278
    def _connectToPrimaryNode(self):
279 280 281
        """
            Lookup for the current primary master node
        """
282
        neo.lib.logging.debug('connecting to primary master...')
283 284
        ready = False
        nm = self.nm
285
        packet = Packets.AskPrimary()
286 287 288 289 290 291 292 293
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
294
                    self.primary_master_node = None
295 296
                else:
                    # Otherwise, check one by one.
297
                    master_list = nm.getMasterList()
298 299 300
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
301
                        time.sleep(1)
302 303 304 305
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
306
                conn = MTClientConnection(self.em,
307
                        self.notifications_handler,
308
                        addr=self.trying_master_node.getAddress(),
309
                        connector=self.connector_handler(),
310
                        dispatcher=self.dispatcher)
311
                # Query for primary master node
312 313
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
314 315
                    neo.lib.logging.error(
                                    'Connection to master node %s failed',
316 317
                                  self.trying_master_node)
                    continue
318
                try:
319 320
                    self._ask(conn, packet,
                        handler=self.primary_bootstrap_handler)
321 322
                except ConnectionClosed:
                    continue
323
                # If we reached the primary master node, mark as connected
324 325
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
326 327
            neo.lib.logging.info(
                            'Connected to %s' % (self.primary_master_node, ))
328 329 330
            try:
                ready = self.identifyToPrimaryNode(conn)
            except ConnectionClosed:
331
                neo.lib.logging.error('Connection to %s lost',
332
                    self.trying_master_node)
333
                self.primary_master_node = None
334
        neo.lib.logging.info("Connected and ready")
335
        return conn
336

337 338 339 340 341 342
    def identifyToPrimaryNode(self, conn):
        """
            Request identification and required informations to be operational.
            Might raise ConnectionClosed so that the new primary can be
            looked-up again.
        """
343
        neo.lib.logging.info('Initializing from master')
344 345
        ask = self._ask
        handler = self.primary_bootstrap_handler
346
        # Identify to primary master and request initial data
347 348
        p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid, None,
            self.name)
349
        while conn.getUUID() is None:
350
            ask(conn, p, handler=handler)
351 352 353 354
            if conn.getUUID() is None:
                # Node identification was refused by master, it is considered
                # as the primary as long as we are connected to it.
                time.sleep(1)
355 356
        ask(conn, Packets.AskNodeInformation(), handler=handler)
        ask(conn, Packets.AskPartitionTable(), handler=handler)
357
        return self.pt.operational()
358

359 360 361
    def registerDB(self, db, limit):
        self._db = db

362 363 364
    def getDB(self):
        return self._db

365
    @profiler_decorator
366 367 368 369 370
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
371 372 373 374
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
375
                self._askPrimary(Packets.AskNewOIDs(100))
376 377
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
378
            self.last_oid = self.new_oid_list.pop(0)
379
            return self.last_oid
380 381 382
        finally:
            self._oid_lock_release()

383 384 385
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
386

387
    @profiler_decorator
388
    def load(self, oid, tid=None, before_tid=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
389 390 391 392 393 394
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
        oid
            OID of object to get.
        tid
395 396 397
            If given, the exact serial at which OID is desired.
            before_tid should be None.
        before_tid
Vincent Pelletier's avatar
Vincent Pelletier committed
398 399 400 401 402 403 404 405 406 407 408 409 410 411
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
412
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
413
                object doesn't exist
414 415
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
416 417 418 419

        Note that loadSerial is used during conflict resolution to load
        object's current version, which is not visible to us normaly (it was
        committed after our snapshot was taken).
Vincent Pelletier's avatar
Vincent Pelletier committed
420 421
        """
        # TODO:
422
        # - rename parameters (here? and in handlers & packet definitions)
423

424 425
        self._load_lock_acquire()
        try:
426
            result = self._loadFromCache(oid, tid, before_tid)
427
            if not result:
428
                result = self._loadFromStorage(oid, tid, before_tid)
429 430 431 432 433 434
                self._cache_lock_acquire()
                try:
                    self._cache.store(oid, *result)
                finally:
                    self._cache_lock_release()
            return result
435 436
        finally:
            self._load_lock_release()
437 438

    @profiler_decorator
439 440
    def _loadFromStorage(self, oid, at_tid, before_tid):
        packet = Packets.AskObject(oid, at_tid, before_tid)
441 442
        for node, conn in self.cp.iterateForObject(oid, readable=True):
            try:
443 444
                noid, tid, next_tid, compression, checksum, data \
                    = self._askStorage(conn, packet)
445 446
            except ConnectionClosed:
                continue
447

448
            if data or checksum != ZERO_HASH:
449 450
                if checksum != makeChecksum(data):
                    neo.lib.logging.error('wrong checksum from %s for oid %s',
451
                              conn, dump(oid))
452 453 454 455 456 457 458 459
                    continue
                if compression:
                    data = decompress(data)
                return data, tid, next_tid
            raise NEOStorageCreationUndoneError(dump(oid))
        # We didn't got any object from all storage node because of
        # connection error
        raise NEOStorageError('connection failure')
460

461
    @profiler_decorator
462
    def _loadFromCache(self, oid, at_tid=None, before_tid=None):
463
        """
464
        Load from local cache, return None if not found.
465 466
        """
        self._cache_lock_acquire()
467
        try:
468 469 470 471 472
            if at_tid:
                result = self._cache.load(oid, at_tid + '*')
                assert not result or result[1] == at_tid
                return result
            return self._cache.load(oid, before_tid)
473
        finally:
474
            self._cache_lock_release()
Aurel's avatar
Aurel committed
475

476
    @profiler_decorator
477 478
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
479
        txn_container = self._txn_container
480
        # First get a transaction, only one is allowed at a time
481
        if txn_container.get(transaction) is not None:
482
            # We already begin the same transaction
483
            raise StorageTransactionError('Duplicate tpc_begin calls')
484
        txn_context = txn_container.new(transaction)
485
        # use the given TID or request a new one to the master
486 487
        answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
        if answer_ttid is None:
488
            raise NEOStorageError('tpc_begin failed')
489 490 491
        assert tid in (None, answer_ttid), (tid, answer_ttid)
        txn_context['txn'] = transaction
        txn_context['ttid'] = answer_ttid
492

493
    @profiler_decorator
494
    def store(self, oid, serial, data, version, transaction):
495
        """Store object."""
496 497
        txn_context = self._txn_container.get(transaction)
        if txn_context is None:
498
            raise StorageTransactionError(self, transaction)
499 500
        neo.lib.logging.debug(
                        'storing oid %s serial %s', dump(oid), dump(serial))
501
        self._store(txn_context, oid, serial, data)
502 503
        return None

504 505 506
    def _store(self, txn_context, oid, serial, data, data_serial=None,
            unlock=False):
        ttid = txn_context['ttid']
507 508 509 510
        if data is None:
            # This is some undo: either a no-data object (undoing object
            # creation) or a back-pointer to an earlier revision (going back to
            # an older object revision).
511
            compressed_data = ''
512
            compression = 0
513
            checksum = ZERO_HASH
514 515
        else:
            assert data_serial is None
516 517
            compression = self.compress
            compressed_data = data
518 519 520 521 522 523 524
            if self.compress:
                compressed_data = compress(data)
                if len(compressed_data) > len(data):
                    compressed_data = data
                    compression = 0
                else:
                    compression = 1
525
            checksum = makeChecksum(compressed_data)
526
        on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
527
        # Store object in tmp cache
528
        data_dict = txn_context['data_dict']
529
        if oid not in data_dict:
530
            txn_context['data_list'].append(oid)
531
        data_dict[oid] = data
532
        # Store data on each node
533 534
        txn_context['object_stored_counter_dict'][oid] = {}
        object_base_serial_dict = txn_context['object_base_serial_dict']
535 536
        if oid not in object_base_serial_dict:
            object_base_serial_dict[oid] = serial
537 538 539 540
        txn_context['object_serial_dict'][oid] = serial
        queue = txn_context['queue']
        involved_nodes = txn_context['involved_nodes']
        add_involved_nodes = involved_nodes.add
541
        packet = Packets.AskStoreObject(oid, serial, compression,
542
            checksum, compressed_data, data_serial, ttid, unlock)
543
        for node, conn in self.cp.iterateForObject(oid, writable=True):
544
            try:
545 546
                conn.ask(packet, on_timeout=on_timeout, queue=queue)
                add_involved_nodes(node)
547
            except ConnectionClosed:
548
                continue
549
        if not involved_nodes:
550
            raise NEOStorageError("Store failed")
551

552
        self._waitAnyTransactionMessage(txn_context, False)
553

554
    def onStoreTimeout(self, conn, msg_id, txn_context, oid):
555
        # NOTE: this method is called from poll thread, don't use
556 557
        #       thread-specific value !
        txn_context.setdefault('timeout_dict', {})[oid] = msg_id
558
        # Ask the storage if someone locks the object.
559 560 561 562
        # By sending a message with a smaller timeout,
        # the connection will be kept open.
        conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
                 timeout=5, queue=txn_context['queue'])
563

564
    @profiler_decorator
565
    def _handleConflicts(self, txn_context, tryToResolveConflict):
566 567 568
        result = []
        append = result.append
        # Check for conflicts
569 570 571 572 573 574 575
        data_dict = txn_context['data_dict']
        object_base_serial_dict = txn_context['object_base_serial_dict']
        object_serial_dict = txn_context['object_serial_dict']
        conflict_serial_dict = txn_context['conflict_serial_dict'].copy()
        txn_context['conflict_serial_dict'].clear()
        resolved_conflict_serial_dict = txn_context[
            'resolved_conflict_serial_dict']
576
        for oid, conflict_serial_set in conflict_serial_dict.iteritems():
577
            conflict_serial = max(conflict_serial_set)
578
            serial = object_serial_dict[oid]
579
            data = data_dict[oid]
580
            if ZERO_TID in conflict_serial_set:
581 582 583 584 585
              if 1:
                # XXX: disable deadlock avoidance code until it is fixed
                neo.lib.logging.info('Deadlock avoidance on %r:%r',
                    dump(oid), dump(serial))
              else:
586 587 588 589 590 591 592 593 594 595
                # Storage refused us from taking object lock, to avoid a
                # possible deadlock. TID is actually used for some kind of
                # "locking priority": when a higher value has the lock,
                # this means we stored objects "too late", and we would
                # otherwise cause a deadlock.
                # To recover, we must ask storages to release locks we
                # hold (to let possibly-competing transactions acquire
                # them), and requeue our already-sent store requests.
                # XXX: currently, brute-force is implemented: we send
                # object data again.
596
                neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
597
                    dump(oid), dump(serial))
598
                for store_oid, store_data in data_dict.iteritems():
599
                    store_serial = object_serial_dict[store_oid]
600
                    if store_data is CHECKED_SERIAL:
601 602
                        self._checkCurrentSerialInTransaction(txn_context,
                            store_oid, store_serial)
603
                    else:
604
                        if store_data is None:
605
                            # Some undo
606
                            neo.lib.logging.warning('Deadlock avoidance cannot'
607 608
                                ' reliably work with undo, this must be '
                                'implemented.')
609
                            conflict_serial = ZERO_TID
610
                            break
611 612
                        self._store(txn_context, store_oid, store_serial,
                            store_data, unlock=True)
613
                else:
614
                    continue
615
            elif data is not CHECKED_SERIAL:
616 617 618 619 620 621 622
                resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                    oid, set())
                if resolved_serial_set and conflict_serial <= max(
                        resolved_serial_set):
                    # A later serial has already been resolved, skip.
                    resolved_serial_set.update(conflict_serial_set)
                    continue
623 624 625
                new_data = tryToResolveConflict(oid, conflict_serial,
                    serial, data)
                if new_data is not None:
626
                    neo.lib.logging.info('Conflict resolution succeed for ' \
627 628 629
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
                    # Mark this conflict as resolved
630
                    resolved_serial_set.update(conflict_serial_set)
631 632
                    # Base serial changes too, as we resolved a conflict
                    object_base_serial_dict[oid] = conflict_serial
633
                    # Try to store again
634
                    self._store(txn_context, oid, conflict_serial, new_data)
635
                    append(oid)
636
                    continue
637
                else:
638
                    neo.lib.logging.info('Conflict resolution failed for ' \
639 640
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
641 642 643
            # XXX: Is it really required to remove from data_dict ?
            del data_dict[oid]
            txn_context['data_list'].remove(oid)
644
            if data is CHECKED_SERIAL:
645 646 647 648
                raise ReadConflictError(oid=oid, serials=(conflict_serial,
                    serial))
            raise ConflictError(oid=oid, serials=(txn_context['ttid'],
                serial), data=data)
649
        return result
650

651
    @profiler_decorator
652
    def waitResponses(self, queue, handler_data):
653
        """Wait for all requests to be answered (or their connection to be
654
        detected as closed)"""
655 656
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
657
        self.setHandlerData(handler_data)
658
        while pending(queue):
659
            _waitAnyMessage(queue)
660

661
    @profiler_decorator
662
    def waitStoreResponses(self, txn_context, tryToResolveConflict):
663 664 665 666
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
667
        ttid = txn_context['ttid']
668
        _handleConflicts = self._handleConflicts
669 670
        queue = txn_context['queue']
        conflict_serial_dict = txn_context['conflict_serial_dict']
671
        pending = self.dispatcher.pending
672
        _waitAnyTransactionMessage = self._waitAnyTransactionMessage
673
        while pending(queue) or conflict_serial_dict:
674 675 676
            # Note: handler data can be overwritten by _handleConflicts
            # so we must set it for each iteration.
            _waitAnyTransactionMessage(txn_context)
677
            if conflict_serial_dict:
678 679
                conflicts = _handleConflicts(txn_context,
                    tryToResolveConflict)
680 681
                if conflicts:
                    update(conflicts)
682

Vincent Pelletier's avatar
Vincent Pelletier committed
683
        # Check for never-stored objects, and update result for all others
684
        for oid, store_dict in \
685
                txn_context['object_stored_counter_dict'].iteritems():
686
            if not store_dict:
687
                neo.lib.logging.error('tpc_store failed')
Vincent Pelletier's avatar
Vincent Pelletier committed
688 689 690
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
691
        return result
Aurel's avatar
Aurel committed
692

693
    @profiler_decorator
694
    def tpc_vote(self, transaction, tryToResolveConflict):
695
        """Store current transaction."""
696 697
        txn_context = self._txn_container.get(transaction)
        if txn_context is None or transaction is not txn_context['txn']:
698
            raise StorageTransactionError(self, transaction)
699

700
        result = self.waitStoreResponses(txn_context, tryToResolveConflict)
701

702
        ttid = txn_context['ttid']
703
        # Store data on each node
704
        txn_stored_counter = 0
705
        packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
706
            str(transaction.description), dumps(transaction._extension),
707 708 709 710
            txn_context['data_list'])
        add_involved_nodes = txn_context['involved_nodes'].add
        for node, conn in self.cp.iterateForObject(ttid, writable=True):
            neo.lib.logging.debug("voting object %s on %s", dump(ttid),
711
                dump(conn.getUUID()))
712
            try:
713
                self._askStorage(conn, packet)
714
            except ConnectionClosed:
715
                continue
716
            add_involved_nodes(node)
717
            txn_stored_counter += 1
718 719

        # check at least one storage node accepted
720
        if txn_stored_counter == 0:
721
            neo.lib.logging.error('tpc_vote failed')
722
            raise NEOStorageError('tpc_vote failed')
723 724 725 726 727
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
728

729
        txn_context['txn_voted'] = True
730 731
        return result

732
    @profiler_decorator
733 734
    def tpc_abort(self, transaction):
        """Abort current transaction."""
735 736 737
        txn_container = self._txn_container
        txn_context = txn_container.get(transaction)
        if txn_context is None:
738
            return
Aurel's avatar
Aurel committed
739

740 741
        ttid = txn_context['ttid']
        p = Packets.AbortTransaction(ttid)
742
        getConnForNode = self.cp.getConnForNode
743
        # cancel transaction one all those nodes
744
        for node in txn_context['involved_nodes']:
745
            conn = getConnForNode(node)
746 747
            if conn is None:
                continue
748 749 750
            try:
                conn.notify(p)
            except:
751 752 753 754
                neo.lib.logging.error(
                    'Exception in tpc_abort while notifying' \
                    'storage node %r of abortion, ignoring.',
                    conn, exc_info=1)
755
        self._getMasterConnection().notify(p)
756 757 758 759 760 761
        queue = txn_context['queue']
        # We don't need to flush queue, as it won't be reused by future
        # transactions (deleted on next line & indexed by transaction object
        # instance).
        self.dispatcher.forget_queue(queue, flush_queue=False)
        txn_container.delete(transaction)
762

763
    @profiler_decorator
764
    def tpc_finish(self, transaction, tryToResolveConflict, f=None):
765
        """Finish current transaction."""
766 767 768
        txn_container = self._txn_container
        txn_context = txn_container.get(transaction)
        if txn_context is None:
769
            raise StorageTransactionError('tpc_finish called for wrong '
770
                'transaction')
771
        if not txn_context['txn_voted']:
772
            self.tpc_vote(transaction, tryToResolveConflict)
773
        self._load_lock_acquire()
774
        try:
775
            # Call finish on master
776 777 778
            oid_list = txn_context['data_list']
            p = Packets.AskFinishTransaction(txn_context['ttid'], oid_list)
            tid = self._askPrimary(p)
779

780 781
            # Call function given by ZODB
            if f is not None:
782
                f(tid)
783 784 785 786

            # Update cache
            self._cache_lock_acquire()
            try:
787
                cache = self._cache
788
                for oid, data in txn_context['data_dict'].iteritems():
789
                    if data is CHECKED_SERIAL:
790 791 792 793 794
                        # this is just a remain of
                        # checkCurrentSerialInTransaction call, ignore (no data
                        # was modified).
                        continue
                    # Update ex-latest value in cache
795
                    cache.invalidate(oid, tid)
796
                    if data is not None:
797
                        # Store in cache with no next_tid
798
                        cache.store(oid, data, tid, None)
799 800
            finally:
                self._cache_lock_release()
801
            txn_container.delete(transaction)
802
            return tid
803
        finally:
804
            self._load_lock_release()
805

Vincent Pelletier's avatar
Vincent Pelletier committed
806
    def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
807 808
        txn_context = self._txn_container.get(txn)
        if txn_context is None:
809
            raise StorageTransactionError(self, undone_tid)
810

811
        txn_info, txn_ext = self._getTransactionInformation(undone_tid)
812
        txn_oid_list = txn_info['oids']
813

814 815
        # Regroup objects per partition, to ask a minimum set of storage.
        partition_oid_dict = {}
816
        pt = self.getPartitionTable()
817
        for oid in txn_oid_list:
818
            partition = pt.getPartition(oid)
819 820 821 822 823 824 825 826 827 828
            try:
                oid_list = partition_oid_dict[partition]
            except KeyError:
                oid_list = partition_oid_dict[partition] = []
            oid_list.append(oid)

        # Ask storage the undo serial (serial at which object's previous data
        # is)
        getCellList = pt.getCellList
        getCellSortKey = self.cp.getCellSortKey
829
        getConnForCell = self.cp.getConnForCell
830 831
        queue = self._getThreadQueue()
        ttid = txn_context['ttid']
832 833
        for partition, oid_list in partition_oid_dict.iteritems():
            cell_list = getCellList(partition, readable=True)
Julien Muchembled's avatar
Julien Muchembled committed
834 835 836 837
            # We do want to shuffle before getting one with the smallest
            # key, so that all cells with the same (smallest) key has
            # identical chance to be chosen.
            shuffle(cell_list)
838
            storage_conn = getConnForCell(min(cell_list, key=getCellSortKey))
839
            storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
Vincent Pelletier's avatar
Vincent Pelletier committed
840
                snapshot_tid, undone_tid, oid_list), queue=queue)
841 842 843 844

        # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
        # meaning that objects in transaction's oid_list do not exist any
        # longer. This is the symptom of a pack, so forbid undoing transaction
Vincent Pelletier's avatar
Vincent Pelletier committed
845
        # when it happens.
846
        undo_object_tid_dict = {}
Vincent Pelletier's avatar
Vincent Pelletier committed
847
        try:
848
            self.waitResponses(queue, undo_object_tid_dict)
Vincent Pelletier's avatar
Vincent Pelletier committed
849 850
        except NEOStorageNotFoundError:
            self.dispatcher.forget_queue(queue)
851
            raise UndoError('non-undoable transaction')
852

853
        # Send undo data to all storage nodes.
854
        for oid in txn_oid_list:
855 856 857 858 859 860 861 862
            current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
863
                    data = self.load(oid, current_serial)[0]
864
                    # Load the version we were undoing to
865
                    undo_data = self.load(oid, undo_serial)[0]
866 867 868 869 870 871 872 873 874 875 876 877 878
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
                    data = tryToResolveConflict(oid, current_serial,
                        undone_tid, undo_data, data)
                except ConflictError:
                    data = None
                if data is None:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
879
            self._store(txn_context, oid, current_serial, data, undo_serial)
880

881
        return None, txn_oid_list
882

883 884 885 886
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

887
    def _getTransactionInformation(self, tid):
888
        packet = Packets.AskTransactionInformation(tid)
889
        for node, conn in self.cp.iterateForObject(tid, readable=True):
890
            try:
891
                txn_info, txn_ext = self._askStorage(conn, packet)
892 893 894 895 896 897 898 899
            except ConnectionClosed:
                continue
            except NEOStorageNotFoundError:
                # TID not found
                continue
            break
        else:
            raise NEOStorageError('Transaction %r not found' % (tid, ))
900
        return (txn_info, txn_ext)
901

902 903 904 905
    # XXX: The following 2 methods fail when they reconnect to a storage after
    #      they already sent a request to a previous storage.
    #      See also testStorageReconnectDuringXxx

906 907
    def undoLog(self, first, last, filter=None, block=0):
        # XXX: undoLog is broken
908 909 910 911
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

912
        # First get a list of transactions from all storage nodes.
913 914
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
915
        pt = self.getPartitionTable()
916
        storage_node_list = pt.getNodeList()
917

918
        queue = self._getThreadQueue()
919
        packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
920
        for storage_node in storage_node_list:
921
            conn = self.cp.getConnForNode(storage_node)
922 923
            if conn is None:
                continue
924
            conn.ask(packet, queue=queue)
925 926

        # Wait for answers from all storages.
927 928
        tid_set = set()
        self.waitResponses(queue, tid_set)
929 930

        # Reorder tids
931
        ordered_tids = sorted(tid_set, reverse=True)
932
        neo.lib.logging.debug("UndoLog tids %s", map(dump, ordered_tids))
933 934
        # For each transaction, get info
        undo_info = []
935
        append = undo_info.append
936
        for tid in ordered_tids:
937
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
938
            if filter is None or filter(txn_info):
939
                txn_info.pop('packed')
940
                txn_info.pop("oids")
941
                self._insertMetadata(txn_info, txn_ext)
942
                append(txn_info)
943 944
                if len(undo_info) >= last - first:
                    break
945 946 947
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
948 949
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
                    block=1)
950 951
        return undo_info

952 953 954 955 956
    def transactionLog(self, start, stop, limit):
        node_map = self.pt.getNodeMap()
        node_list = node_map.keys()
        node_list.sort(key=self.cp.getCellSortKey)
        partition_set = set(range(self.pt.getPartitions()))
957
        queue = self._getThreadQueue()
958 959 960 961 962 963 964 965 966 967 968 969
        # request a tid list for each partition
        for node in node_list:
            conn = self.cp.getConnForNode(node)
            request_set = set(node_map[node]) & partition_set
            if conn is None or not request_set:
                continue
            partition_set -= set(request_set)
            packet = Packets.AskTIDsFrom(start, stop, limit, request_set)
            conn.ask(packet, queue=queue)
            if not partition_set:
                break
        assert not partition_set
970 971
        tid_set = set()
        self.waitResponses(queue, tid_set)
972 973 974 975
        # request transactions informations
        txn_list = []
        append = txn_list.append
        tid = None
976
        for tid in sorted(tid_set):
977
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
978
            txn_info['ext'] = loads(txn_ext)
979 980
            append(txn_info)
        return (tid, txn_list)
981

982
    def history(self, oid, size=1, filter=None):
983
        # Get history informations for object first
984
        packet = Packets.AskObjectHistory(oid, 0, size)
985
        for node, conn in self.cp.iterateForObject(oid, readable=True):
986
            try:
987
                history_list = self._askStorage(conn, packet)
988
            except ConnectionClosed:
989
                continue
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
            # Now that we have object informations, get txn informations
            result = []
            # history_list is already sorted descending (by the storage)
            for serial, size in history_list:
                txn_info, txn_ext = self._getTransactionInformation(serial)
                # create history dict
                txn_info.pop('id')
                txn_info.pop('oids')
                txn_info.pop('packed')
                txn_info['tid'] = serial
                txn_info['version'] = ''
                txn_info['size'] = size
                if filter is None or filter(txn_info):
                    result.append(txn_info)
                self._insertMetadata(txn_info, txn_ext)
            return result
1006

1007
    @profiler_decorator
1008
    def importFrom(self, source, start, stop, tryToResolveConflict):
1009
        serials = {}
1010
        transaction_iter = source.iterator(start, stop)
1011
        for transaction in transaction_iter:
1012 1013
            tid = transaction.tid
            self.tpc_begin(transaction, tid, transaction.status)
1014
            for r in transaction:
1015 1016
                oid = r.oid
                pre = serials.get(oid, None)
1017
                # TODO: bypass conflict resolution, locks...
1018 1019 1020 1021 1022 1023
                self.store(oid, pre, r.data, r.version, transaction)
                serials[oid] = tid
            conflicted = self.tpc_vote(transaction, tryToResolveConflict)
            assert not conflicted, conflicted
            real_tid = self.tpc_finish(transaction, tryToResolveConflict)
            assert real_tid == tid, (real_tid, tid)
1024 1025
        transaction_iter.close()

1026 1027 1028
    def iterator(self, start, stop):
        if start is None:
            start = ZERO_TID
1029 1030
        return Iterator(self, start, stop)

1031
    def lastTransaction(self):
1032
        return self._askPrimary(Packets.AskLastTransaction())