app.py 43.6 KB
Newer Older
1
#
2
# Copyright (C) 2006-2019  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
import heapq
18
import random
19
import time
20
from collections import defaultdict
21

22 23 24 25 26
try:
    from ZODB._compat import dumps, loads, _protocol
except ImportError:
    from cPickle import dumps, loads
    _protocol = 1
27
from ZODB.POSException import UndoError, ConflictError, ReadConflictError
28 29 30
from . import OLD_ZODB
if OLD_ZODB:
  from ZODB.ConflictResolution import ResolvedSerial
31
from persistent.TimeStamp import TimeStamp
32

33
from neo.lib import logging
34
from neo.lib.compress import decompress_list, getCompress
35
from neo.lib.protocol import NodeTypes, Packets, \
36
    INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
37
from neo.lib.util import makeChecksum, dump
38
from neo.lib.locking import Empty, Lock
39
from neo.lib.connection import MTClientConnection, ConnectionClosed
40
from neo.lib.exception import NodeNotReady
41
from .exception import (NEOStorageError, NEOStorageCreationUndoneError,
42
    NEOStorageReadRetry, NEOStorageNotFoundError, NEOPrimaryMasterLost)
43
from .handlers import storage, master
44
from neo.lib.threaded_app import ThreadedApplication
45
from .cache import ClientCache
46
from .transactions import TransactionContainer
47
from neo.lib.util import p64, u64, parseMasterList
48

49
CHECKED_SERIAL = object()
50

51 52 53 54
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600

55 56 57 58 59 60 61
try:
    from Signals.Signals import SignalHandler
except ImportError:
    SignalHandler = None
if SignalHandler:
    import signal
    SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
62

63
class Application(ThreadedApplication):
64 65
    """The client node application."""

66 67 68 69 70
    # For tests only. Do not touch. We want tpc_finish to always recover when
    # the transaction is really committed, no matter for how long the master
    # is unreachable.
    max_reconnection_to_master = float('inf')

71 72
    def __init__(self, master_nodes, name, compress=True, cache_size=None,
                 **kw):
73
        super(Application, self).__init__(parseMasterList(master_nodes),
74
                                          name, **kw)
75
        # Internal Attributes common to all thread
76
        self._db = None
77
        self.primary_master_node = None
78
        self.trying_master_node = None
79

80
        # no self-assigned NID, primary master will supply us one
81 82
        self._cache = ClientCache() if cache_size is None else \
                      ClientCache(max_size=cache_size)
83
        self._loading_oid = None
84
        self.new_oids = ()
85
        self.last_oid = '\0' * 8
86
        self.storage_event_handler = storage.StorageEventHandler(self)
87
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
88
        self.storage_handler = storage.StorageAnswersHandler(self)
89 90
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
91
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
92
        self._txn_container = TransactionContainer()
93
        # Lock definition :
94
        # _load_lock is used to make loading and storing atomic
95
        lock = Lock()
96 97
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
98 99
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
100 101
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
102
        self._oid_lock_release = lock.release
103
        lock = Lock()
104
        # _cache_lock is used for the client cache
105 106
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
107
        # _connecting_to_master_node is used to prevent simultaneous master
108
        # node connection attempts
109
        self._connecting_to_master_node = Lock()
110 111 112
        # same for storage nodes
        self._connecting_to_storage_node = Lock()
        self._node_failure_dict = {}
113
        self.compress = getCompress(compress)
114

115
    def __getattr__(self, attr):
116
        if attr in ('last_tid', 'pt'):
117 118 119 120 121
            self._getMasterConnection()
            # XXX: There's still a risk that we get disconnected from the
            #      master at this precise moment and for 'pt', we'd raise
            #      AttributeError. Should we catch it and loop until it
            #      succeeds?
122 123
        return self.__getattribute__(attr)

124 125 126
    def log(self):
        super(Application, self).log()
        logging.info("%r", self._cache)
127 128
        for txn_context in self._txn_container.itervalues():
            logging.info("%r", txn_context)
129

130 131 132 133 134
    @property
    def txn_contexts(self):
        # do not iter lazily to avoid race condition
        return self._txn_container.values

135
    def _waitAnyMessage(self, queue, block=True):
136 137 138 139 140 141
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
142 143
        pending = self.dispatcher.pending
        get = queue.get
144
        _handlePacket = self._handlePacket
145
        while pending(queue):
146
            try:
147
                conn, packet, kw = get(block)
148
            except Empty:
149
                break
150 151
            block = False
            try:
152
                _handlePacket(conn, packet, kw)
153 154
            except (ConnectionClosed, NEOStorageReadRetry):
                # We also catch NEOStorageReadRetry for ObjectUndoSerial.
155 156
                pass

157 158 159 160 161
    def _waitAnyTransactionMessage(self, txn_context, block=True):
        """
        Just like _waitAnyMessage, but for per-transaction exchanges, rather
        than per-thread.
        """
162
        queue = txn_context.queue
163 164 165 166 167 168
        self.setHandlerData(txn_context)
        try:
            self._waitAnyMessage(queue, block=block)
        finally:
            # Don't leave access to thread context, even if a raise happens.
            self.setHandlerData(None)
169
        if txn_context.conflict_dict:
170
            self._handleConflicts(txn_context)
171

172
    def _askStorage(self, conn, packet, **kw):
173
        """ Send a request to a storage node and process its answer """
174
        return self._ask(conn, packet, handler=self.storage_handler, **kw)
175

176
    def _askPrimary(self, packet, **kw):
177
        """ Send a request to the primary master and process its answer """
178
        return self._ask(self._getMasterConnection(), packet,
179
            handler=self.primary_handler, **kw)
180

181 182
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
183
        # For performance reasons, get 'master_conn' without locking.
184 185
        result = self.master_conn
        if result is None:
186 187 188 189 190
            # If not connected, 'master_conn' must be tested again while we have
            # the lock, to avoid concurrent threads reconnecting.
            with self._connecting_to_master_node:
                result = self.master_conn
                if result is None:
191
                    self.new_oids = ()
192
                    result = self.master_conn = self._connectToPrimaryNode()
193
        return result
194

195
    def _connectToPrimaryNode(self):
196 197 198
        """
            Lookup for the current primary master node
        """
199
        logging.debug('connecting to primary master...')
200
        self.start()
201
        index = -1
202
        fail_count = 0
203 204 205
        ask = self._ask
        handler = self.primary_bootstrap_handler
        while 1:
206
            self.ignore_invalidations = True
207
            # Get network connection to primary master
208
            while fail_count < self.max_reconnection_to_master:
209
                self.nm.reset()
210 211
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
212
                    node = self.primary_master_node
213
                    self.primary_master_node = None
214 215
                else:
                    # Otherwise, check one by one.
216
                    master_list = self.nm.getMasterList()
217 218 219 220 221 222
                    if not master_list:
                        # XXX: On shutdown, it already happened that this list
                        #      is empty, leading to ZeroDivisionError. This
                        #      looks a minor issue so let's wait to have more
                        #      information.
                        logging.error('%r', self.__dict__)
223
                    index = (index + 1) % len(master_list)
224
                    node = master_list[index]
225
                # Connect to master
226
                conn = MTClientConnection(self,
227
                        self.notifications_handler,
228
                        node=node,
229
                        dispatcher=self.dispatcher)
230 231
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
                    self.uuid, None, self.name, None, (), ())
232
                try:
233
                    ask(conn, p, handler=handler)
234
                except ConnectionClosed:
235
                    fail_count += 1
236 237
                else:
                    self.primary_master_node = node
238
                    break
239 240 241
            else:
                raise NEOPrimaryMasterLost(
                    "Too many connection failures to the primary master")
242
            logging.info('Connected to %s', self.primary_master_node)
243
            try:
244 245 246 247 248 249 250
                # Request identification and required informations to be
                # operational. Might raise ConnectionClosed so that the new
                # primary can be looked-up again.
                logging.info('Initializing from master')
                ask(conn, Packets.AskLastTransaction(), handler=handler)
                if self.pt.operational():
                    break
251
            except ConnectionClosed:
252
                logging.error('Connection to %s lost', self.trying_master_node)
253
                self.primary_master_node = None
254
            fail_count += 1
255
        logging.info("Connected and ready")
256
        return conn
257

258 259 260 261 262 263 264 265 266 267 268
    def getStorageConnection(self, node):
        conn = node._connection # XXX
        if node.isRunning() if conn is None else not node._identified:
            with self._connecting_to_storage_node:
                conn = node._connection # XXX
                if conn is None:
                    return self._connectToStorageNode(node)
        return conn

    def _connectToStorageNode(self, node):
        if self.master_conn is None:
269
            raise NEOPrimaryMasterLost
270 271
        conn = MTClientConnection(self, self.storage_event_handler, node,
                                  dispatcher=self.dispatcher)
272
        p = Packets.RequestIdentification(NodeTypes.CLIENT,
273
            self.uuid, None, self.name, self.id_timestamp, (), ())
274
        try:
275
            self._ask(conn, p, handler=self.storage_bootstrap_handler)
276 277 278 279 280 281 282 283
        except ConnectionClosed:
            logging.error('Connection to %r failed', node)
        except NodeNotReady:
            logging.info('%r not ready', node)
        else:
            logging.info('Connected %r', node)
            # Make sure this node will be considered for the next reads
            # even if there was a previous recent failure.
284
            self._node_failure_dict.pop(node.getUUID(), None)
285
            return conn
286
        self._node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
287 288 289

    def getCellSortKey(self, cell, random=random.random):
        # Prefer a node that didn't fail recently.
290
        failure = self._node_failure_dict.get(cell.getUUID())
291 292 293 294 295
        if failure:
            if time.time() < failure:
                # Or order by date of connection failure.
                return failure
            # Do not use 'del' statement: we didn't lock, so another
296 297
            # thread might have removed uuid from _node_failure_dict.
            self._node_failure_dict.pop(cell.getUUID(), None)
298 299 300 301 302 303 304
        # A random one, connected or not, is a trivial and quite efficient way
        # to distribute the load evenly. On write accesses, a client connects
        # to all nodes of touched cells, but before that, or if a client is
        # specialized to only do read-only accesses, it should not limit
        # itself to only use the first connected nodes.
        return random()

305 306 307
    def registerDB(self, db, limit):
        self._db = db

308 309 310
    def getDB(self):
        return self._db

311 312 313 314
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
315 316 317
            for oid in self.new_oids:
                break
            else:
318 319 320 321
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
322
                self._askPrimary(Packets.AskNewOIDs(100))
323 324 325
                for oid in self.new_oids:
                    break
                else:
326
                    raise NEOStorageError('new_oid failed')
327
            self.last_oid = oid
328
            return oid
329 330 331
        finally:
            self._oid_lock_release()

332 333
    def getObjectCount(self):
        # return the last OID used, this is inaccurate
334
        return int(u64(self.last_oid))
335

336 337
    def _askStorageForRead(self, object_id, packet, askStorage=None):
        pt = self.pt
338 339
        # BBB: On Py2, it can be a subclass of bytes (binary from zodbpickle).
        if isinstance(object_id, bytes):
340 341 342 343 344 345 346 347 348
            object_id = pt.getPartition(object_id)
        if askStorage is None:
            askStorage = self._askStorage
        # Failure condition with minimal overhead: most of the time, only the
        # following line is executed. In case of storage errors, we retry each
        # node at least once, without looping forever.
        failed = 0
        while 1:
            cell_list = pt.getCellList(object_id, True)
349
            cell_list.sort(key=self.getCellSortKey)
350 351
            for cell in cell_list:
                node = cell.getNode()
352
                conn = self.getStorageConnection(node)
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
                if conn is not None:
                    try:
                        return askStorage(conn, packet)
                    except ConnectionClosed:
                        pass
                    except NEOStorageReadRetry, e:
                        if e.args[0]:
                            continue
                failed += 1
            if not pt.filled():
                raise NEOPrimaryMasterLost
            if len(cell_list) < failed: # too many failures
                raise NEOStorageError('no storage available')
            # Do not retry too quickly, for example
            # when there's an incoming PT update.
            self.sync()

370
    def load(self, oid, tid=None, before_tid=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
371 372 373 374 375 376
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
        oid
            OID of object to get.
        tid
377 378 379
            If given, the exact serial at which OID is desired.
            before_tid should be None.
        before_tid
Vincent Pelletier's avatar
Vincent Pelletier committed
380 381 382 383 384 385 386 387 388 389 390 391 392 393
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
394
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
395
                object doesn't exist
396 397
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
398 399

        Note that loadSerial is used during conflict resolution to load
400
        object's current version, which is not visible to us normally (it was
401
        committed after our snapshot was taken).
Vincent Pelletier's avatar
Vincent Pelletier committed
402 403
        """
        # TODO:
404
        # - rename parameters (here? and in handlers & packet definitions)
405

406 407
        acquire = self._cache_lock_acquire
        release = self._cache_lock_release
408
        # XXX: Consider using a more fine-grained lock.
409 410
        self._load_lock_acquire()
        try:
411 412 413 414 415
            acquire()
            try:
                result = self._loadFromCache(oid, tid, before_tid)
                if result:
                    return result
416
                self._loading_oid = oid
417
                self._loading_invalidated = []
418
            finally:
419
                release()
420 421 422 423 424 425 426
            # While the cache lock is released, an arbitrary number of
            # invalidations may be processed, for this oid or not. And at this
            # precise moment, if both tid and before_tid are None (which is
            # unlikely to happen with recent ZODB), self.last_tid can be any
            # new tid. Since we can get any serial from storage, fixing
            # next_tid requires to keep a list of all possible serials.

427 428 429 430 431 432 433
            # When not bound to a ZODB Connection, load() may be the
            # first method called and last_tid may still be None.
            # This happens, for example, when opening the DB.
            if not (tid or before_tid) and self.last_tid:
                # Do not get something more recent than the last invalidation
                # we got from master.
                before_tid = p64(u64(self.last_tid) + 1)
434
            data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
435 436
            acquire()
            try:
437 438
                if self._loading_oid:
                    if not next_tid:
439 440 441 442
                        for t in self._loading_invalidated:
                            if tid < t:
                                next_tid = t
                                break
443 444
                    self._cache.store(oid, data, tid, next_tid)
                # Else, we just reconnected to the master.
445 446
            finally:
                release()
447 448
        finally:
            self._load_lock_release()
449
        return data, tid, next_tid
450

451
    def _loadFromStorage(self, oid, at_tid, before_tid):
452 453 454
        def askStorage(conn, packet):
            tid, next_tid, compression, checksum, data, data_tid \
                = self._askStorage(conn, packet)
455
            if data or checksum != ZERO_HASH:
456
                if checksum != makeChecksum(data):
457
                    logging.error('wrong checksum from %s for oid %s',
458
                              conn, dump(oid))
459
                    raise NEOStorageReadRetry(False)
460
                return (decompress_list[compression](data),
461
                        tid, next_tid, data_tid)
462
            raise NEOStorageCreationUndoneError(dump(oid))
463 464 465
        return self._askStorageForRead(oid,
            Packets.AskObject(oid, at_tid, before_tid),
            askStorage)
466

467
    def _loadFromCache(self, oid, at_tid=None, before_tid=None):
468
        """
469
        Load from local cache, return None if not found.
470
        """
471 472 473 474 475
        if at_tid:
            result = self._cache.load(oid, at_tid + '*')
            assert not result or result[1] == at_tid
            return result
        return self._cache.load(oid, before_tid)
Aurel's avatar
Aurel committed
476

477
    def tpc_begin(self, storage, transaction, tid=None, status=' '):
478 479
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
480
        txn_context = self._txn_container.new(transaction)
481
        # use the given TID or request a new one to the master
482 483
        answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
        if answer_ttid is None:
484
            raise NEOStorageError('tpc_begin failed')
485
        assert tid in (None, answer_ttid), (tid, answer_ttid)
486 487
        txn_context.Storage = storage
        txn_context.ttid = answer_ttid
488

489
    def store(self, oid, serial, data, version, transaction):
490
        """Store object."""
491
        logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
492 493
        if not serial: # BBB
            serial = ZERO_TID
494
        self._store(self._txn_container.get(transaction), oid, serial, data)
495

496
    def _store(self, txn_context, oid, serial, data, data_serial=None):
497
        ttid = txn_context.ttid
498 499 500 501
        if data is None:
            # This is some undo: either a no-data object (undoing object
            # creation) or a back-pointer to an earlier revision (going back to
            # an older object revision).
502
            compressed_data = ''
503
            compression = 0
504
            checksum = ZERO_HASH
505 506
        else:
            assert data_serial is None
507
            size, compression, compressed_data = self.compress(data)
508
            checksum = makeChecksum(compressed_data)
509
            txn_context.data_size += size
510
        # Store object in tmp cache
511
        packet = Packets.AskStoreObject(oid, serial, compression,
512
            checksum, compressed_data, data_serial, ttid)
513
        txn_context.data_dict[oid] = data, serial, txn_context.write(
514
            self, packet, oid, oid=oid, serial=serial)
515

516
        while txn_context.data_size >= self._cache.max_size:
517
            self._waitAnyTransactionMessage(txn_context)
518
        self._waitAnyTransactionMessage(txn_context, False)
519

520
    def _handleConflicts(self, txn_context):
521 522 523 524
        data_dict = txn_context.data_dict
        pop_conflict = txn_context.conflict_dict.popitem
        resolved_dict = txn_context.resolved_dict
        tryToResolveConflict = txn_context.Storage.tryToResolveConflict
525 526 527
        while 1:
            # We iterate over conflict_dict, and clear it,
            # because new items may be added by calls to _store.
528 529
            # This is also done atomically, to avoid race conditions
            # with PrimaryNotificationsHandler.notifyDeadlock
530
            try:
531
                oid, serial = pop_conflict()
532 533
            except KeyError:
                return
534
            try:
535
                data, old_serial, _ = data_dict.pop(oid)
536
            except KeyError:
537
                assert oid is None, (oid, serial)
538 539 540 541 542 543 544 545
                # Storage refused us from taking object lock, to avoid a
                # possible deadlock. TID is actually used for some kind of
                # "locking priority": when a higher value has the lock,
                # this means we stored objects "too late", and we would
                # otherwise cause a deadlock.
                # To recover, we must ask storages to release locks we
                # hold (to let possibly-competing transactions acquire
                # them), and requeue our already-sent store requests.
546 547 548 549 550
                ttid = txn_context.ttid
                logging.info('Deadlock avoidance triggered for TXN %s'
                  ' with new locking TID %s', dump(ttid), dump(serial))
                txn_context.locking_tid = serial
                packet = Packets.AskRebaseTransaction(ttid, serial)
551 552
                for uuid in txn_context.conn_dict:
                    self._askStorageForWrite(txn_context, uuid, packet)
553 554
            else:
                if data is CHECKED_SERIAL:
555 556
                    raise ReadConflictError(oid=oid,
                        serials=(serial, old_serial))
557 558
                # TODO: data can be None if a conflict happens during undo
                if data:
559
                    txn_context.data_size -= len(data)
560
                if self.last_tid < serial:
561
                    self.sync() # possible late invalidation (very rare)
562
                try:
563
                    data = tryToResolveConflict(oid, serial, old_serial, data)
564
                except ConflictError:
565 566
                    logging.info(
                        'Conflict resolution failed for %s@%s with %s',
567
                        dump(oid), dump(old_serial), dump(serial))
568 569 570
                    # With recent ZODB, get_pickle_metadata (from ZODB.utils)
                    # does not support empty values, so do not pass 'data'
                    # in this case.
571 572
                    raise ConflictError(oid=oid, serials=(serial, old_serial),
                                        data=data or None)
573
                else:
574 575
                    logging.info(
                        'Conflict resolution succeeded for %s@%s with %s',
576
                        dump(oid), dump(old_serial), dump(serial))
577
                    # Mark this conflict as resolved
578
                    resolved_dict[oid] = serial
579
                    # Try to store again
580
                    self._store(txn_context, oid, serial, data)
581 582

    def _askStorageForWrite(self, txn_context, uuid, packet):
583 584 585
          conn = txn_context.conn_dict[uuid]
          try:
              return conn.ask(packet, queue=txn_context.queue)
586 587 588
          except AttributeError:
              if conn is not None:
                  raise
589
          except ConnectionClosed:
590
              txn_context.conn_dict[uuid] = None
591

592
    def waitResponses(self, queue):
593
        """Wait for all requests to be answered (or their connection to be
594
        detected as closed)"""
595 596 597
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
598
            _waitAnyMessage(queue)
599

600
    def waitStoreResponses(self, txn_context):
601
        queue = txn_context.queue
602
        pending = self.dispatcher.pending
603
        _waitAnyTransactionMessage = self._waitAnyTransactionMessage
604
        while pending(queue):
605
            _waitAnyTransactionMessage(txn_context)
606
        if txn_context.data_dict:
607
            raise NEOStorageError('could not store/check all oids')
Aurel's avatar
Aurel committed
608

609
    def tpc_vote(self, transaction):
610
        """Store current transaction."""
611
        txn_context = self._txn_container.get(transaction)
612
        self.waitStoreResponses(txn_context)
613
        ttid = txn_context.ttid
614 615 616 617
        ext = transaction._extension
        ext = dumps(ext, _protocol) if ext else ''
        # user and description are cast to str in case they're unicode.
        # BBB: This is not required anymore with recent ZODB.
618
        packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
619
            str(transaction.description), ext, list(txn_context.cache_dict))
620
        queue = txn_context.queue
621
        conn_dict = txn_context.conn_dict
622 623
        # Ask in parallel all involved storage nodes to commit object metadata.
        # Nodes that store the transaction metadata get a special packet.
624
        trans_nodes = txn_context.write(self, packet, ttid)
625
        packet = Packets.AskVoteTransaction(ttid)
626 627
        for uuid in conn_dict:
            if uuid not in trans_nodes:
628
                self._askStorageForWrite(txn_context, uuid, packet)
629
        self.waitStoreResponses(txn_context)
630
        if None in conn_dict.itervalues(): # unlikely
631 632 633 634
            # If some writes failed, we must first check whether
            # all oids have been locked by at least one node.
            failed = {node.getUUID(): node.isRunning()
                for node in self.nm.getStorageList()
635
                if conn_dict.get(node.getUUID(), 0) is None}
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
            if txn_context.lockless_dict:
                getCellList = self.pt.getCellList
                for offset, uuid_set in txn_context.lockless_dict.iteritems():
                    for cell in getCellList(offset):
                        uuid = cell.getUUID()
                        if not (uuid in failed or uuid in uuid_set):
                            break
                    else:
                        # Very unlikely. Instead of raising, we could recover
                        # the transaction by doing something similar to
                        # deadlock avoidance; that would be done before voting.
                        # But it's not worth the complexity.
                        raise NEOStorageError(
                            'partition %s not fully write-locked' % offset)
            failed = [uuid for uuid, running in failed.iteritems() if running]
            # If there are running nodes for which some writes failed, ask the
            # master whether they can be disconnected while keeping the cluster
            # operational. If possible, this will happen during tpc_finish.
            if failed:
                try:
                    self._askPrimary(Packets.FailedVote(ttid, failed))
                except ConnectionClosed:
                    pass
659
        txn_context.voted = True
660 661 662 663 664 665
        # We must not go further if connection to master was lost since
        # tpc_begin, to lower the probability of failing during tpc_finish.
        # IDEA: We can improve in 2 opposite directions:
        #       - In the case of big transactions, it would be useful to
        #         also detect failures earlier.
        #       - If possible, recover from master failure.
666 667
        if txn_context.error:
            raise NEOStorageError(txn_context.error)
668 669
        if OLD_ZODB:
            return [(oid, ResolvedSerial)
670 671
                for oid in txn_context.resolved_dict]
        return txn_context.resolved_dict
672

673 674
    def tpc_abort(self, transaction):
        """Abort current transaction."""
675
        txn_context = self._txn_container.pop(transaction)
676
        if txn_context is None:
677
            return
678 679 680 681 682 683 684
        # We want that the involved nodes abort a transaction after any
        # other packet sent by the client for this transaction. IOW, if we
        # already have a connection with a storage node, potentially with
        # a pending write, aborting only via the master may lead to a race
        # condition. The consequence would be that storage nodes lock oids
        # forever.
        p = Packets.AbortTransaction(txn_context.ttid, ())
685
        for conn in txn_context.conn_dict.itervalues():
686 687 688 689 690
            if conn is not None:
                try:
                    conn.send(p)
                except ConnectionClosed:
                    pass
691 692 693 694 695 696
        # Because we want to be sure that the involved nodes are notified,
        # we still have to send the full list to the master. Most of the
        # time, the storage nodes get 2 AbortTransaction packets, and the
        # second one is rarely useful. Another option would be that the
        # storage nodes keep a list of aborted transactions, but the
        # difficult part would be to avoid a memory leak.
697
        try:
698
            notify = self.master_conn.send
699 700 701 702
        except AttributeError:
            pass
        else:
            try:
703
                notify(Packets.AbortTransaction(txn_context.ttid,
704
                    list(txn_context.conn_dict)))
705 706
            except ConnectionClosed:
                pass
707 708 709
        # We don't need to flush queue, as it won't be reused by future
        # transactions (deleted on next line & indexed by transaction object
        # instance).
710
        self.dispatcher.forget_queue(txn_context.queue, flush_queue=False)
711

712
    def tpc_finish(self, transaction, f=None):
713 714 715 716 717 718 719
        """Finish current transaction

        To avoid inconsistencies between several databases involved in the
        same transaction, an IStorage implementation must do its best not to
        fail in tpc_finish. In particular, making a transaction permanent
        should ideally be as simple as switching a bit permanently.

720 721 722 723 724 725 726 727
        In NEO, all the data (with the exception of the tid, simply because
        it is not known yet) is already flushed on disk at the end on the vote.
        During tpc_finish, all nodes storing the transaction metadata are asked
        to commit by saving the new tid and flushing again: for SQL backends,
        it's just an UPDATE of 1 cell. At last, the metadata is moved to
        a final place so that the new transaction is readable, but this is
        something that can always be replayed (during the verification phase)
        if any failure happens.
728
        """
729
        txn_container = self._txn_container
730
        if not txn_container.get(transaction).voted:
731
            self.tpc_vote(transaction)
732
        checked_list = []
733
        self._load_lock_acquire()
734
        try:
735
            # Call finish on master
736
            txn_context = txn_container.pop(transaction)
737
            cache_dict = txn_context.cache_dict
738 739 740 741
            checked_list = [oid for oid, data  in cache_dict.iteritems()
                                if data is CHECKED_SERIAL]
            for oid in checked_list:
                del cache_dict[oid]
742
            ttid = txn_context.ttid
743 744
            p = Packets.AskFinishTransaction(ttid, list(cache_dict),
                                             checked_list)
745 746 747 748 749 750 751
            try:
                tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
                assert tid
            except ConnectionClosed:
                tid = self._getFinalTID(ttid)
                if not tid:
                    raise
752
            return tid
753
        finally:
754
            self._load_lock_release()
755

756 757 758 759 760 761 762 763 764 765 766
    def _getFinalTID(self, ttid):
        try:
            p = Packets.AskFinalTID(ttid)
            while 1:
                try:
                    tid = self._askPrimary(p)
                    break
                except ConnectionClosed:
                    pass
            if tid == MAX_TID:
                while 1:
767 768 769 770
                    try:
                        return self._askStorageForRead(ttid, p)
                    except NEOPrimaryMasterLost:
                        pass
771 772 773 774 775 776
            elif tid:
                return tid
        except Exception:
            logging.exception("Failed to get final tid for TXN %s",
                              dump(ttid))

777
    def undo(self, undone_tid, txn):
778
        txn_context = self._txn_container.get(txn)
779
        txn_info, txn_ext = self._getTransactionInformation(undone_tid)
780

781
        # Regroup objects per partition, to ask a minimum set of storage.
782 783 784
        partition_oid_dict = defaultdict(list)
        for oid in txn_info['oids']:
            partition_oid_dict[self.pt.getPartition(oid)].append(oid)
785 786 787

        # Ask storage the undo serial (serial at which object's previous data
        # is)
788
        getCellList = self.pt.getCellList
789 790
        getCellSortKey = self.getCellSortKey
        getConnForNode = self.getStorageConnection
791
        queue = self._thread_container.queue
792
        ttid = txn_context.ttid
793
        undo_object_tid_dict = {}
794
        snapshot_tid = p64(u64(self.last_tid) + 1)
795 796 797 798 799 800 801 802 803 804 805 806 807
        kw = {
            'queue': queue,
            'partition_oid_dict': partition_oid_dict,
            'undo_object_tid_dict': undo_object_tid_dict,
        }
        while partition_oid_dict:
            for partition, oid_list in partition_oid_dict.iteritems():
                cell_list = [cell
                    for cell in getCellList(partition, readable=True)
                    # Exclude nodes that may have missed previous resolved
                    # conflicts. For example, if a network failure happened
                    # only between the client and the storage, the latter would
                    # still be readable until we commit.
808
                    if txn_context.conn_dict.get(cell.getUUID(), 0) is not None]
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823
                storage_conn = getConnForNode(
                    min(cell_list, key=getCellSortKey).getNode())
                storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
                    snapshot_tid, undone_tid, oid_list),
                    partition=partition, **kw)

            # Wait for all AnswerObjectUndoSerial. We might get
            # OidNotFoundError, meaning that objects in transaction's oid_list
            # do not exist any longer. This is the symptom of a pack, so forbid
            # undoing transaction when it happens.
            try:
                self.waitResponses(queue)
            except NEOStorageNotFoundError:
                self.dispatcher.forget_queue(queue)
                raise UndoError('non-undoable transaction')
824

825
        # Send undo data to all storage nodes.
826 827
        for oid, (current_serial, undo_serial, is_current) in \
                undo_object_tid_dict.iteritems():
828 829 830 831 832 833 834
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
835 836 837 838 839
                    if current_serial == ttid:
                        # XXX: see TODO below
                        data = txn_context.cache_dict[oid]
                    else:
                        data = self.load(oid, current_serial)[0]
840
                    # Load the version we were undoing to
841
                    undo_data = self.load(oid, undo_serial)[0]
842 843 844 845 846
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
847
                    data = txn_context.Storage.tryToResolveConflict(
848
                        oid, current_serial, undone_tid, undo_data, data)
849 850 851 852
                except ConflictError:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
853 854 855 856 857
                # TODO: The situation is similar to deadlock avoidance.
                #       Reenable the cache size limit to avoid OOM when there's
                #       a huge amount conflicting data, and get the data back
                #       from the storage when it's not in cache_dict anymore.
                txn_context.cache_size = - float('inf')
858
            self._store(txn_context, oid, current_serial, data, undo_serial)
859

860
        self.waitStoreResponses(txn_context)
861
        return None, list(undo_object_tid_dict)
862

863
    def _getTransactionInformation(self, tid):
864 865
        return self._askStorageForRead(tid,
            Packets.AskTransactionInformation(tid))
866 867 868

    def undoLog(self, first, last, filter=None, block=0):
        # XXX: undoLog is broken
869 870 871 872
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

873
        # First get a list of transactions from all storage nodes.
874 875
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
876
        queue = self._thread_container.queue
877
        packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
878
        tid_set = set()
879
        for storage_node in self.pt.getNodeSet(True):
880
            conn = self.getStorageConnection(storage_node)
881 882
            if conn is None:
                continue
883
            conn.ask(packet, queue=queue, tid_set=tid_set)
884 885

        # Wait for answers from all storages.
886 887 888
        # TODO: Results are incomplete when readable cells move concurrently
        #       from one storage to another. We detect when this happens and
        #       retry.
889
        self.waitResponses(queue)
890 891

        # Reorder tids
892
        ordered_tids = sorted(tid_set, reverse=True)
893
        logging.debug("UndoLog tids %s", map(dump, ordered_tids))
894 895
        # For each transaction, get info
        undo_info = []
896
        append = undo_info.append
897
        for tid in ordered_tids:
898
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
899
            if filter is None or filter(txn_info):
900
                txn_info.pop('packed')
901
                txn_info.pop("oids")
902 903
                if txn_ext:
                    txn_info.update(loads(txn_ext))
904
                append(txn_info)
905 906
                if len(undo_info) >= last - first:
                    break
907 908 909
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
910 911
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
                    block=1)
912 913
        return undo_info

914
    def transactionLog(self, start, stop, limit):
915
        tid_list = []
916
        # request a tid list for each partition
917
        for offset in xrange(self.pt.getPartitions()):
918 919
            r = self._askStorageForRead(offset,
                Packets.AskTIDsFrom(start, stop, limit, offset))
920 921 922 923 924
            if r:
                tid_list = list(heapq.merge(tid_list, r))
                if len(tid_list) >= limit:
                    del tid_list[limit:]
                    stop = tid_list[-1]
925 926 927 928
        # request transactions informations
        txn_list = []
        append = txn_list.append
        tid = None
929
        for tid in tid_list:
930
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
931
            txn_info['ext'] = loads(txn_ext) if txn_ext else {}
932 933
            append(txn_info)
        return (tid, txn_list)
934

935
    def history(self, oid, size=1, filter=None):
936
        packet = Packets.AskObjectHistory(oid, 0, size)
937 938 939
        result = []
        # history_list is already sorted descending (by the storage)
        for serial, size in self._askStorageForRead(oid, packet):
940 941
                txn_info, txn_ext = self._getTransactionInformation(serial)
                # create history dict
942 943 944
                del txn_info['id']
                del txn_info['oids']
                del txn_info['packed']
945 946 947 948 949
                txn_info['tid'] = serial
                txn_info['version'] = ''
                txn_info['size'] = size
                if filter is None or filter(txn_info):
                    result.append(txn_info)
950 951
                if txn_ext:
                    txn_info.update(loads(txn_ext))
952
        return result
953

954
    def importFrom(self, storage, source):
955 956 957 958 959
        # TODO: The main difference with BaseStorage implementation is that
        #       preindex can't be filled with the result 'store' (tid only
        #       known after 'tpc_finish'. This method could be dropped if we
        #       implemented IStorageRestoreable (a wrapper around source would
        #       still be required for partial import).
960 961
        preindex = {}
        for transaction in source.iterator():
962
            tid = transaction.tid
963
            self.tpc_begin(storage, transaction, tid, transaction.status)
964
            for r in transaction:
965
                oid = r.oid
966 967 968 969 970 971 972
                try:
                    pre = preindex[oid]
                except KeyError:
                    try:
                        pre = self.load(oid)[1]
                    except NEOStorageNotFoundError:
                        pre = ZERO_TID
973
                self.store(oid, pre, r.data, r.version, transaction)
974
                preindex[oid] = tid
975
            conflicted = self.tpc_vote(transaction)
976
            assert not conflicted, conflicted
977
            real_tid = self.tpc_finish(transaction)
978
            assert real_tid == tid, (real_tid, tid)
979

980
    from .iterator import iterator
981

982 983
    def sync(self):
        self._askPrimary(Packets.Ping())
984

985
    def pack(self, t):
986
        tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw()
987 988 989
        if tid == ZERO_TID:
            raise NEOStorageError('Invalid pack time')
        self._askPrimary(Packets.AskPack(tid))
990 991 992 993 994 995
        # XXX: this is only needed to make ZODB unit tests pass.
        # It should not be otherwise required (clients should be free to load
        # old data as long as it is available in cache, event if it was pruned
        # by a pack), so don't bother invalidating on other clients.
        self._cache_lock_acquire()
        try:
996
            self._cache.clear()
997 998
        finally:
            self._cache_lock_release()
999

1000
    def getLastTID(self, oid):
1001
        return self.load(oid)[1]
1002 1003

    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
1004 1005
        self._checkCurrentSerialInTransaction(
            self._txn_container.get(transaction), oid, serial)
1006 1007

    def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
1008
        ttid = txn_context.ttid
1009
        # ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
1010
        # after stores, and skips oids that have been successfully stored.
1011 1012
        assert oid not in txn_context.cache_dict, oid
        assert oid not in txn_context.data_dict, oid
1013
        packet = Packets.AskCheckCurrentSerial(ttid, oid, serial)
1014
        txn_context.data_dict[oid] = CHECKED_SERIAL, serial, txn_context.write(
1015
            self, packet, oid, oid=oid, serial=serial)
1016
        self._waitAnyTransactionMessage(txn_context, False)
1017