app.py 41.3 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
23
import time
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27

28 29 30 31
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
32
from neo.protocol import NodeTypes, Packets, INVALID_PARTITION
33
from neo.event import EventManager
34
from neo.util import makeChecksum as real_makeChecksum, dump
35
from neo.locking import Lock
36
from neo.connection import MTClientConnection
37
from neo.node import NodeManager
38
from neo.connector import getConnectorHandler
39
from neo.client.exception import NEOStorageError
40
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
41 42
from neo.exception import NeoException
from neo.client.handlers import storage, master
43
from neo.dispatcher import Dispatcher
44
from neo.client.poll import ThreadedPoll
45 46
from neo.client.iterator import Iterator
from neo.client.mq import MQ
47
from neo.client.pool import ConnectionPool
48
from neo.util import u64, parseMasterList
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
from neo.profiling import profiler_decorator, PROFILING_ENABLED

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
65

66 67
class ThreadContext(object):

68
    def __init__(self):
69
        super(ThreadContext, self).__setattr__('_threads_dict', {})
70

71
    def __getThreadData(self):
72
        thread_id = get_ident()
73
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
74
            result = self._threads_dict[thread_id]
75
        except KeyError:
76 77
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
78 79 80 81 82 83 84 85
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
86

87 88 89 90
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

91 92 93
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
94 95 96 97 98
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
99
        self._threads_dict[thread_id] = {
100 101 102
            'tid': None,
            'txn': None,
            'data_dict': {},
103 104 105
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
106
            'resolved_conflict_serial_dict': {},
107 108 109
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
110
            'queue': queue,
111 112 113 114 115
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
116 117
            'undo_conflict_oid_list': [],
            'undo_error_oid_list': [],
118 119 120
        }


Aurel's avatar
Aurel committed
121
class Application(object):
122 123
    """The client node application."""

124
    def __init__(self, master_nodes, name, connector=None, **kw):
125
        # Start polling thread
126 127
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
128
        # Internal Attributes common to all thread
129
        self._db = None
Aurel's avatar
Aurel committed
130
        self.name = name
131
        self.connector_handler = getConnectorHandler(connector)
132
        self.dispatcher = Dispatcher()
133
        self.nm = NodeManager()
134
        self.cp = ConnectionPool(self)
135
        self.pt = None
136
        self.master_conn = None
137
        self.primary_master_node = None
138
        self.trying_master_node = None
139 140

        # load master node list
141
        for address in parseMasterList(master_nodes):
142
            self.nm.createMaster(address=address)
143

144
        # no self-assigned UUID, primary master will supply us one
145
        self.uuid = None
146
        self.mq_cache = MQ()
147
        self.new_oid_list = []
148
        self.last_oid = '\0' * 8
149
        self.storage_event_handler = storage.StorageEventHandler(self)
150
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
151
        self.storage_handler = storage.StorageAnswersHandler(self)
152 153
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
154
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
155
        # Internal attribute distinct between thread
156
        self.local_var = ThreadContext()
157
        # Lock definition :
158
        # _load_lock is used to make loading and storing atomic
159
        lock = Lock()
160 161
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
162 163
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
164 165
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
166
        self._oid_lock_release = lock.release
167
        lock = Lock()
168
        # _cache_lock is used for the client cache
169 170
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
171
        lock = Lock()
172 173
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
174 175
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
176 177 178 179
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
180

181
    @profiler_decorator
182 183 184 185 186 187 188 189 190 191 192
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
193 194
            # Guess the handler to use based on the type of node on the
            # connection
195 196 197 198 199 200 201 202 203 204 205 206
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
        handler.dispatch(conn, packet)

207
    @profiler_decorator
208 209 210 211 212 213 214
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
215 216 217
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
218
        _handlePacket = self._handlePacket
219
        while pending(queue):
220 221 222
            try:
                conn, packet = get(block)
            except Empty:
223
                break
224 225 226
            if packet is None:
                # connection was closed
                continue
227 228 229 230 231 232
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

233
    @profiler_decorator
234 235 236 237 238 239 240 241 242 243 244 245 246
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
                    self._handlePacket(conn, packet, handler=handler)
                    break
247 248
            elif packet is not None:
                self._handlePacket(conn, packet)
249

250
    @profiler_decorator
251
    def _askStorage(self, conn, packet):
252
        """ Send a request to a storage node and process it's answer """
253
        msg_id = conn.ask(packet)
254 255
        self._waitMessage(conn, msg_id, self.storage_handler)

256
    @profiler_decorator
257
    def _askPrimary(self, packet):
258
        """ Send a request to the primary master and process it's answer """
259
        conn = self._getMasterConnection()
260
        msg_id = conn.ask(packet)
261 262
        self._waitMessage(conn, msg_id, self.primary_handler)

263
    @profiler_decorator
264 265
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
266
        # acquire the lock to allow only one thread to connect to the primary
267 268 269 270
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
271
                self.new_oid_list = []
272 273 274 275 276
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
277

278
    def _getPartitionTable(self):
279
        """ Return the partition table manager, reconnect the PMN if needed """
280 281 282 283 284
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

285
    @profiler_decorator
286 287
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
288
        pt = self._getPartitionTable()
289 290 291 292
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
293
        pt = self._getPartitionTable()
294
        return pt.getCellListForTID(tid, readable, writable)
295

296
    @profiler_decorator
297
    def _connectToPrimaryNode(self):
298
        logging.debug('connecting to primary master...')
299 300 301 302 303 304 305 306 307 308
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
309
                    self.primary_master_node = None
310 311
                else:
                    # Otherwise, check one by one.
312
                    master_list = nm.getMasterList()
313 314 315
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
316
                        time.sleep(1)
317 318 319 320
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
321 322
                conn = MTClientConnection(self.local_var, self.em,
                        self.notifications_handler,
323
                        addr=self.trying_master_node.getAddress(),
324
                        connector=self.connector_handler(),
325
                        dispatcher=self.dispatcher)
326
                # Query for primary master node
327 328 329 330 331 332
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
                    logging.error('Connection to master node %s failed',
                                  self.trying_master_node)
                    continue
                msg_id = conn.ask(Packets.AskPrimary())
333
                try:
334
                    self._waitMessage(conn, msg_id,
335
                            handler=self.primary_bootstrap_handler)
336 337
                except ConnectionClosed:
                    continue
338
                # If we reached the primary master node, mark as connected
339 340
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
341

342
            logging.info('connected to a primary master node')
343
            # Identify to primary master and request initial data
344
            while conn.getUUID() is None:
345 346 347 348 349 350 351 352
                if conn.getConnector() is None:
                    logging.error('Connection to master node %s lost',
                                  self.trying_master_node)
                    self.primary_master_node = None
                    break
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
                        self.uuid, None, self.name)
                msg_id = conn.ask(p)
353
                try:
354
                    self._waitMessage(conn, msg_id,
355
                            handler=self.primary_bootstrap_handler)
356 357 358
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
359
                if conn.getUUID() is None:
360
                    # Node identification was refused by master.
361
                    time.sleep(1)
362
            if self.uuid is not None:
363
                msg_id = conn.ask(Packets.AskNodeInformation())
364
                self._waitMessage(conn, msg_id,
365
                        handler=self.primary_bootstrap_handler)
366
                msg_id = conn.ask(Packets.AskPartitionTable([]))
367
                self._waitMessage(conn, msg_id,
368
                        handler=self.primary_bootstrap_handler)
369
            ready = self.uuid is not None and self.pt is not None \
370
                                 and self.pt.operational()
371
        logging.info("connected to primary master node %s" %
372
                self.primary_master_node)
373
        return conn
374

375 376 377
    def registerDB(self, db, limit):
        self._db = db

378 379 380
    def getDB(self):
        return self._db

381
    @profiler_decorator
382 383 384 385 386
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
387 388 389 390
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
391
                self._askPrimary(Packets.AskNewOIDs(100))
392 393
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
394 395
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
396 397 398
        finally:
            self._oid_lock_release()

399 400 401
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
402

403
    @profiler_decorator
Aurel's avatar
Aurel committed
404 405 406 407
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
408 409
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
410 411 412
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
413
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
414 415
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
416 417 418 419
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
420

421
    @profiler_decorator
422
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
423
        """Internal method which manage load ,loadSerial and loadBefore."""
424
        cell_list = self._getCellListForOID(oid, readable=True)
425 426
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
427 428
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
429 430 431 432 433 434 435
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
436
            conn = self.cp.getConnForCell(cell)
437 438
            if conn is None:
                continue
439

440
            try:
441
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
442
            except ConnectionClosed:
443
                continue
444

445 446 447 448 449 450 451 452 453 454
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
455
                              noid, dump(oid), cell.getAddress())
456 457 458 459 460
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
461
                              cell.getAddress(), dump(oid))
462 463 464 465 466 467 468
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
469
            # We didn't got any object from all storage node because of
470
            # connection error
471
            logging.warning('oid %s not found because of connection failure',
472
                    dump(oid))
473
            raise NEOStorageNotFoundError()
474

Aurel's avatar
Aurel committed
475
        if self.local_var.asked_object == -1:
476
            # We didn't got any object from all storage node
477
            logging.info('oid %s not found', dump(oid))
478
            raise NEOStorageNotFoundError()
479

480
        # Uncompress data
Aurel's avatar
Aurel committed
481
        if compression:
482
            data = decompress(data)
483

Aurel's avatar
Aurel committed
484 485
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
486
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
487
            try:
488
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
489
            finally:
Aurel's avatar
Aurel committed
490
                self._cache_lock_release()
491 492
        if data == '':
            data = None
493
        return data, start_serial, end_serial
494

495

496
    @profiler_decorator
497 498 499
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
500
        self._load_lock_acquire()
501
        try:
502 503 504 505
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
506 507
                    serial, data = self.mq_cache[oid]
                    return data, serial
508 509 510 511
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
512
        finally:
513
            self._load_lock_release()
Aurel's avatar
Aurel committed
514

515

516
    @profiler_decorator
517
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
518
        """Load an object for a given oid and serial."""
519 520
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
521
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
522

523

524
    @profiler_decorator
525
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
526
        """Load an object for a given oid before tid committed."""
527 528
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
529
        data, start, end = self._load(oid, tid=tid)
530 531 532 533 534
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
535

536

537
    @profiler_decorator
538 539 540
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
541
        if self.local_var.txn is transaction:
542
            # We already begin the same transaction
543
            return
544 545 546 547
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
548
        self._askPrimary(Packets.AskBeginTransaction(tid))
549 550
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
551
        self.local_var.txn = transaction
552

553

554
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
555
    def store(self, oid, serial, data, version, transaction):
556
        """Store object."""
557
        if transaction is not self.local_var.txn:
558
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
559
        logging.debug('storing oid %s serial %s',
560
                     dump(oid), dump(serial))
561
        # Find which storage node to use
562
        cell_list = self._getCellListForOID(oid, writable=True)
563
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
564
            raise NEOStorageError
565 566 567
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
568
        compressed_data = compress(data)
569
        if len(compressed_data) > len(data):
570 571 572 573
            compressed_data = data
            compression = 0
        else:
            compression = 1
574
        checksum = makeChecksum(compressed_data)
575
        p = Packets.AskStoreObject(oid, serial, compression,
576
                 checksum, compressed_data, self.local_var.tid)
577 578
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
579
        # Store data on each node
580 581
        self.local_var.object_stored_counter_dict[oid] = 0
        self.local_var.object_serial_dict[oid] = (serial, version)
582
        getConnForCell = self.cp.getConnForCell
583
        for cell in cell_list:
584
            conn = getConnForCell(cell)
585
            if conn is None:
586
                continue
587
            try:
588
                conn.ask(p)
589
            except ConnectionClosed:
590
                continue
591

592 593
        self._waitAnyMessage(False)
        return None
594

595
    @profiler_decorator
596 597 598 599 600 601 602
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
603 604 605
        conflict_serial_dict = local_var.conflict_serial_dict
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
        for oid, conflict_serial in conflict_serial_dict.items():
606 607 608 609 610 611 612 613
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
614
                    # Mark this conflict as resolved
615 616
                    resolved_conflict_serial_dict[oid] = \
                        conflict_serial_dict.pop(oid)
617 618
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
Vincent Pelletier's avatar
Vincent Pelletier committed
619
                        local_var.txn)
620 621 622 623 624 625 626 627
                    append(oid)
                    resolved = True
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
628

629 630 631 632 633 634 635 636 637 638
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

639
    @profiler_decorator
640 641 642 643 644 645 646 647 648
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
        while True:
649
            self.waitResponses()
650 651
            if tryToResolveConflict is None:
                break
652 653 654 655 656 657 658
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
659

660 661 662 663 664 665 666 667 668 669
        if tryToResolveConflict is not None:
            # Check for never-stored objects, and update result for all others
            for oid, store_count in \
                local_var.object_stored_counter_dict.iteritems():
                if store_count == 0:
                    raise NEOStorageError('tpc_store failed')
                elif oid in resolved_oid_set:
                    append((oid, ResolvedSerial))
                else:
                    append((oid, tid))
670
        return result
Aurel's avatar
Aurel committed
671

672
    @profiler_decorator
673
    def tpc_vote(self, transaction, tryToResolveConflict):
674
        """Store current transaction."""
675 676
        local_var = self.local_var
        if transaction is not local_var.txn:
677
            raise StorageTransactionError(self, transaction)
678 679 680

        result = self.waitStoreResponses(tryToResolveConflict)

681
        tid = local_var.tid
682
        # Store data on each node
683
        voted_counter = 0
684 685 686 687
        p = Packets.AskStoreTransaction(tid, transaction.user,
            transaction.description, dumps(transaction._extension),
            local_var.data_dict.keys())
        for cell in self._getCellListForTID(tid, writable=True):
688 689
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
690
            conn = self.cp.getConnForCell(cell)
691 692
            if conn is None:
                continue
693

694
            local_var.txn_voted = False
695 696
            try:
                self._askStorage(conn, p)
697
            except ConnectionClosed:
698
                continue
699

700
            if not self.isTransactionVoted():
701
                raise NEOStorageError('tpc_vote failed')
702
            voted_counter += 1
703 704

        # check at least one storage node accepted
705
        if voted_counter == 0:
706
            raise NEOStorageError('tpc_vote failed')
707 708 709 710 711
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
712

713 714
        return result

715
    @profiler_decorator
716 717
    def tpc_abort(self, transaction):
        """Abort current transaction."""
718
        if transaction is not self.local_var.txn:
719
            return
Aurel's avatar
Aurel committed
720

721 722 723 724
        # Just wait for response to arrive, don't handle any conflict, and
        # ignore the outcome: we are going to abort anyway.
        self.waitStoreResponses(None)

725
        cell_set = set()
726 727
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
728
            cell_set |= set(self._getCellListForOID(oid, writable=True))
729
        # select nodes where transaction was stored
730
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
731
            writable=True))
Aurel's avatar
Aurel committed
732

733 734
        # cancel transaction one all those nodes
        for cell in cell_set:
735
            conn = self.cp.getConnForCell(cell)
736 737
            if conn is None:
                continue
738
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
739

740
        # Abort the transaction in the primary master node.
741
        conn = self._getMasterConnection()
742
        conn.notify(Packets.AbortTransaction(self.local_var.tid))
743
        self.local_var.clear()
744

745
    @profiler_decorator
746 747
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
748
        if self.local_var.txn is not transaction:
749
            return
750
        self._load_lock_acquire()
751
        try:
752
            tid = self.local_var.tid
753 754
            # Call function given by ZODB
            if f is not None:
755
                f(tid)
756 757

            # Call finish on master
758
            oid_list = self.local_var.data_dict.keys()
759
            p = Packets.AskFinishTransaction(oid_list, tid)
760
            self._askPrimary(p)
761

762
            if not self.isTransactionFinished():
763
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
764

765 766 767
            # Update cache
            self._cache_lock_acquire()
            try:
768
                mq_cache = self.mq_cache
769
                for oid, data in self.local_var.data_dict.iteritems():
770
                    if data == '':
771 772
                        if oid in mq_cache:
                            del mq_cache[oid]
773 774
                    else:
                        # Now serial is same as tid
775
                        mq_cache[oid] = tid, data
776 777
            finally:
                self._cache_lock_release()
778
            self.local_var.clear()
779
            return tid
780
        finally:
781
            self._load_lock_release()
782

783
    def undo(self, undone_tid, txn, tryToResolveConflict):
784
        if txn is not self.local_var.txn:
785
            raise StorageTransactionError(self, undone_tid)
786

787
        # First get transaction information from a storage node.
788
        cell_list = self._getCellListForTID(undone_tid, readable=True)
789
        assert len(cell_list), 'No cell found for transaction %s' % (
790
            dump(undone_tid), )
791 792
        shuffle(cell_list)
        for cell in cell_list:
793
            conn = self.cp.getConnForCell(cell)
794 795
            if conn is None:
                continue
796

797
            self.local_var.txn_info = 0
798
            self.local_var.txn_ext = 0
799
            try:
800
                self._askStorage(conn, Packets.AskTransactionInformation(
801
                    undone_tid))
802
            except ConnectionClosed:
803
                continue
804

805 806
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
807
                logging.warning('Transaction %s was not found on node %s',
808
                    dump(undone_tid), self.nm.getByAddress(conn.getAddress()))
809
                continue
810
            elif isinstance(self.local_var.txn_info, dict):
811 812 813
                break
            else:
                raise NEOStorageError('undo failed')
814
        else:
815 816
            raise NEOStorageError('undo failed')

817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
        if self.local_var.txn_info['packed']:
            UndoError('non-undoable transaction')

        tid = self.local_var.tid

        undo_conflict_oid_list = self.local_var.undo_conflict_oid_list = []
        undo_error_oid_list = self.local_var.undo_error_oid_list = []
        ask_undo_transaction = Packets.AskUndoTransaction(tid, undone_tid)
        getConnForNode = self.cp.getConnForNode
        for storage_node in self.nm.getStorageList():
            storage_conn = getConnForNode(storage_node)
            storage_conn.ask(ask_undo_transaction)
        # Wait for all AnswerUndoTransaction.
        self.waitResponses()

        # Don't do any handling for "live" conflicts, raise
        if undo_conflict_oid_list:
            raise ConflictError(oid=undo_conflict_oid_list[0], serials=(tid,
                undone_tid), data=None)

        # Try to resolve undo conflicts
        for oid in undo_error_oid_list:
            def loadBefore(oid, tid):
                try:
                    result = self._load(oid, tid=tid)
                except NEOStorageNotFoundError:
                    raise UndoError("Object not found while resolving undo " \
                        "conflict")
                return result[:2]
            # Load the latest version we are supposed to see
            data, data_tid = loadBefore(oid, tid)
            # Load the version we were undoing to
            undo_data, _ = loadBefore(oid, undone_tid)
            # Resolve conflict
            new_data = tryToResolveConflict(oid, data_tid, undone_tid, undo_data,
                data)
            if new_data is None:
                raise UndoError('Some data were modified by a later ' \
                    'transaction', oid)
            else:
                self.store(oid, data_tid, new_data, '', self.local_var.txn)

859
        oid_list = self.local_var.txn_info['oids']
860 861 862
        # Consistency checking: all oids of the transaction must have been
        # reported as undone
        data_dict = self.local_var.data_dict
Aurel's avatar
Aurel committed
863
        for oid in oid_list:
864
            assert oid in data_dict, repr(oid)
865
        return self.local_var.tid, oid_list
866

867 868 869 870
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

871
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
872 873 874 875
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

876
        # First get a list of transactions from all storage nodes.
877 878
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
879
        pt = self._getPartitionTable()
880
        storage_node_list = pt.getNodeList()
881

882 883
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
884
            conn = self.cp.getConnForNode(storage_node)
885 886
            if conn is None:
                continue
887
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
888 889

        # Wait for answers from all storages.
890
        while len(self.local_var.node_tids) != len(storage_node_list):
891
            self._waitAnyMessage()
892 893

        # Reorder tids
894 895 896
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
897
            update(tid_list)
898
        ordered_tids = list(ordered_tids)
899
        ordered_tids.sort(reverse=True)
900
        logging.debug("UndoLog, tids %s", ordered_tids)
901 902
        # For each transaction, get info
        undo_info = []
903
        append = undo_info.append
904
        for tid in ordered_tids:
905
            cell_list = self._getCellListForTID(tid, readable=True)
906 907
            shuffle(cell_list)
            for cell in cell_list:
908
                conn = self.cp.getConnForCell(cell)
909 910
                if conn is not None:
                    self.local_var.txn_info = 0
911
                    self.local_var.txn_ext = 0
912
                    try:
913
                        self._askStorage(conn,
914
                                Packets.AskTransactionInformation(tid))
915
                    except ConnectionClosed:
916 917 918
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
919

920
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
921
                # TID not found at all
922 923 924
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
925

926
            if filter is None or filter(self.local_var.txn_info):
927
                self.local_var.txn_info.pop('packed')
928 929
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
930
                append(self.local_var.txn_info)
931 932
                self._insertMetadata(self.local_var.txn_info,
                        self.local_var.txn_ext)
933 934
                if len(undo_info) >= last - first:
                    break
935 936 937
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
938 939
            undo_info = self.__undoLog(first=first, last=last*5, filter=filter,
                    block=1, with_oids=with_oids)
940 941
        return undo_info

942
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
943
        return self.__undoLog(first, last, filter, block)
944 945

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
946
        return self.__undoLog(first, last, with_oids=True)
947

Grégory Wisniewski's avatar
Grégory Wisniewski committed
948
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
949
        # Get history informations for object first
950
        cell_list = self._getCellListForOID(oid, readable=True)
951 952 953
        shuffle(cell_list)

        for cell in cell_list:
954
            conn = self.cp.getConnForCell(cell)
955 956
            if conn is None:
                continue
957

958
            self.local_var.history = None
959
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
960
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
961
            except ConnectionClosed:
962
                continue
963

964 965 966 967 968
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
969 970 971
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
972

973
        if not isinstance(self.local_var.history, tuple):
974 975
            raise NEOStorageError('history failed')

976 977
        if self.local_var.history[1] == [] or \
            self.local_var.history[1][0][1] == 0:
978 979 980 981
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

982 983 984 985
        if object_only:
            # Use by getSerial
            return self.local_var.history

986
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
987
        history_list = []
988
        for serial, size in self.local_var.history[1]:
989
            self._getCellListForTID(serial, readable=True)
990 991 992
            shuffle(cell_list)

            for cell in cell_list:
993
                conn = self.cp.getConnForCell(cell)
994 995
                if conn is None:
                    continue
996

997 998
                # ask transaction information
                self.local_var.txn_info = None
999
                try:
1000
                    self._askStorage(conn,
1001
                            Packets.AskTransactionInformation(serial))
1002
                except ConnectionClosed:
1003
                    continue
1004

1005 1006
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
1007
                    continue
1008
                if isinstance(self.local_var.txn_info, dict):
1009 1010 1011
                    break

            # create history dict
1012 1013
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1014
            self.local_var.txn_info.pop('packed')
1015
            self.local_var.txn_info['tid'] = serial
1016
            self.local_var.txn_info['version'] = ''
1017
            self.local_var.txn_info['size'] = size
1018 1019
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1020 1021
            self._insertMetadata(self.local_var.txn_info,
                    self.local_var.txn_ext)
1022 1023

        return history_list
Aurel's avatar
Aurel committed
1024

1025
    @profiler_decorator
1026
    def importFrom(self, source, start, stop, tryToResolveConflict):
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
        serials = {}
        def updateLastSerial(oid, result):
            if result:
                if isinstance(result, str):
                    assert oid is not None
                    serials[oid] = result
                else:
                    for oid, serial in result:
                        assert isinstance(serial, str), serial
                        serials[oid] = serial
1037
        transaction_iter = source.iterator(start, stop)
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
        for transaction in transaction_iter:
            self.tpc_begin(transaction, transaction.tid, transaction.status)
            for r in transaction:
                pre = serials.get(r.oid, None)
                # TODO: bypass conflict resolution, locks...
                result = self.store(r.oid, pre, r.data, r.version, transaction)
                updateLastSerial(r.oid, result)
            updateLastSerial(None, self.tpc_vote(transaction,
                        tryToResolveConflict))
            self.tpc_finish(transaction)
        transaction_iter.close()

1050 1051 1052
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1053 1054 1055 1056 1057
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1068 1069 1070 1071
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

1072 1073 1074 1075 1076
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1077
            conn.close()
1078 1079
        # Stop polling thread
        self.poll_thread.stop()
1080
    close = __del__
1081 1082

    def sync(self):
1083
        self._waitAnyMessage(False)
1084

1085 1086 1087 1088 1089 1090 1091 1092 1093
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1094
    def setTID(self, value):
1095
        self.local_var.tid = value
1096 1097

    def getTID(self):
1098
        return self.local_var.tid
1099 1100

    def setTransactionFinished(self):
1101
        self.local_var.txn_finished = True
1102 1103

    def isTransactionFinished(self):
1104
        return self.local_var.txn_finished
1105 1106

    def setTransactionVoted(self):
1107
        self.local_var.txn_voted = True
1108 1109

    def isTransactionVoted(self):
1110
        return self.local_var.txn_voted
1111