app.py 37.9 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27

28 29 30 31
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
32
from neo import protocol
33
from neo.protocol import NodeTypes, Packets
34
from neo.event import EventManager
35
from neo.util import makeChecksum, dump
36
from neo.locking import Lock
37
from neo.connection import MTClientConnection
38
from neo.node import NodeManager
39
from neo.connector import getConnectorHandler
40
from neo.client.exception import NEOStorageError
41
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
42 43
from neo.exception import NeoException
from neo.client.handlers import storage, master
44
from neo.dispatcher import Dispatcher
45
from neo.client.poll import ThreadedPoll
46 47
from neo.client.iterator import Iterator
from neo.client.mq import MQ
48
from neo.client.pool import ConnectionPool
49
from neo.util import u64, parseMasterList
50

Aurel's avatar
Aurel committed
51

52 53
class ThreadContext(object):

54
    def __init__(self):
55
        super(ThreadContext, self).__setattr__('_threads_dict', {})
56

57
    def __getThreadData(self):
58
        thread_id = get_ident()
59
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
60
            result = self._threads_dict[thread_id]
61
        except KeyError:
62 63
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
64 65 66 67 68 69 70 71
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
72

73 74 75 76
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

77 78 79
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
80 81 82 83 84
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
85
        self._threads_dict[thread_id] = {
86 87 88
            'tid': None,
            'txn': None,
            'data_dict': {},
89 90 91
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
92 93 94
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
95
            'queue': queue,
96 97 98 99 100
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
101 102 103
        }


Aurel's avatar
Aurel committed
104
class Application(object):
105 106
    """The client node application."""

107
    def __init__(self, master_nodes, name, connector=None, **kw):
108
        # Start polling thread
109 110
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
111
        # Internal Attributes common to all thread
112
        self._db = None
Aurel's avatar
Aurel committed
113
        self.name = name
114
        self.connector_handler = getConnectorHandler(connector)
115
        self.dispatcher = Dispatcher()
116
        self.nm = NodeManager()
117
        self.cp = ConnectionPool(self)
118
        self.pt = None
119
        self.master_conn = None
120
        self.primary_master_node = None
121
        self.trying_master_node = None
122 123

        # load master node list
124
        for address in parseMasterList(master_nodes):
125
            self.nm.createMaster(address=address)
126

127
        # no self-assigned UUID, primary master will supply us one
128
        self.uuid = None
129
        self.mq_cache = MQ()
130
        self.new_oid_list = []
131
        self.last_oid = '\0' * 8
132
        self.storage_event_handler = storage.StorageEventHandler(self)
133
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
134
        self.storage_handler = storage.StorageAnswersHandler(self)
135 136
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
137
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
138
        # Internal attribute distinct between thread
139
        self.local_var = ThreadContext()
140
        # Lock definition :
141
        # _load_lock is used to make loading and storing atomic
142
        lock = Lock()
143 144
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
145 146
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
147 148
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
149
        self._oid_lock_release = lock.release
150
        lock = Lock()
151
        # _cache_lock is used for the client cache
152 153
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
154
        lock = Lock()
155 156
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
157 158
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
159 160 161 162
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
163

164 165 166 167 168 169 170 171 172 173 174
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
175 176
            # Guess the handler to use based on the type of node on the
            # connection
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
        handler.dispatch(conn, packet)

    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            try:
                conn, packet = get(block)
            except Empty:
202
                break
203 204 205
            if packet is None:
                # connection was closed
                continue
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
                    self._handlePacket(conn, packet, handler=handler)
                    break
            self._handlePacket(conn, packet)
226

227 228 229
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
230 231
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
232 233 234 235 236 237 238
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
239
        conn = self._getMasterConnection()
240 241
        conn.lock()
        try:
242 243
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
244 245 246 247
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

248 249
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
250
        # acquire the lock to allow only one thread to connect to the primary
251 252 253 254
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
255
                self.new_oid_list = []
256 257 258 259 260
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
261

262
    def _getPartitionTable(self):
263
        """ Return the partition table manager, reconnect the PMN if needed """
264 265 266 267 268
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

269 270
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
271
        pt = self._getPartitionTable()
272 273 274 275
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
276
        pt = self._getPartitionTable()
277
        return pt.getCellListForTID(tid, readable, writable)
278

279
    def _connectToPrimaryNode(self):
280
        logging.debug('connecting to primary master...')
281 282 283 284 285 286 287 288 289 290
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
291
                    self.primary_master_node = None
292 293
                else:
                    # Otherwise, check one by one.
294
                    master_list = nm.getMasterList()
295 296 297
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
298
                        sleep(1)
299 300 301 302
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
303
                conn = MTClientConnection(self.em, self.notifications_handler,
304 305 306
                        addr=self.trying_master_node.getAddress(),
                        connector_handler=self.connector_handler,
                        dispatcher=self.dispatcher)
307 308
                # Query for primary master node
                conn.lock()
309
                try:
310
                    if conn.getConnector() is None:
311
                        # This happens if a connection could not be established.
312 313 314
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
315
                    msg_id = conn.ask(self.local_var.queue,
316
                            Packets.AskPrimary())
317 318
                finally:
                    conn.unlock()
319
                try:
320
                    self._waitMessage(conn, msg_id,
321
                            handler=self.primary_bootstrap_handler)
322 323
                except ConnectionClosed:
                    continue
324
                # If we reached the primary master node, mark as connected
325 326
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
327

328
            logging.info('connected to a primary master node')
329
            # Identify to primary master and request initial data
330 331 332
            while conn.getUUID() is None:
                conn.lock()
                try:
333 334 335 336 337
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
338
                    p = Packets.RequestIdentification(NodeTypes.CLIENT,
339
                            self.uuid, None, self.name)
340
                    msg_id = conn.ask(self.local_var.queue, p)
341 342
                finally:
                    conn.unlock()
343
                try:
344
                    self._waitMessage(conn, msg_id,
345
                            handler=self.primary_bootstrap_handler)
346 347 348
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
349
                if conn.getUUID() is None:
350
                    # Node identification was refused by master.
351
                    sleep(1)
352
            if self.uuid is not None:
353 354
                conn.lock()
                try:
355
                    msg_id = conn.ask(self.local_var.queue,
356
                                      Packets.AskNodeInformation())
357 358
                finally:
                    conn.unlock()
359
                self._waitMessage(conn, msg_id,
360
                        handler=self.primary_bootstrap_handler)
361 362
                conn.lock()
                try:
363
                    msg_id = conn.ask(self.local_var.queue,
364
                                      Packets.AskPartitionTable([]))
365 366
                finally:
                    conn.unlock()
367
                self._waitMessage(conn, msg_id,
368
                        handler=self.primary_bootstrap_handler)
369
            ready = self.uuid is not None and self.pt is not None \
370
                                 and self.pt.operational()
371
        logging.info("connected to primary master node %s" %
372
                self.primary_master_node)
373
        return conn
374

375 376 377
    def registerDB(self, db, limit):
        self._db = db

378 379 380
    def getDB(self):
        return self._db

381 382 383 384 385
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
386 387 388 389
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
390
                self._askPrimary(Packets.AskNewOIDs(100))
391 392
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
393 394
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
395 396 397
        finally:
            self._oid_lock_release()

398 399 400
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
401

Aurel's avatar
Aurel committed
402 403 404 405
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
406 407
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
408 409 410
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
411
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
412 413
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
414 415 416 417
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
418

419
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
420
        """Internal method which manage load ,loadSerial and loadBefore."""
421
        cell_list = self._getCellListForOID(oid, readable=True)
422 423
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
424 425
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
426 427 428 429 430 431 432
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
433
            conn = self.cp.getConnForCell(cell)
434 435
            if conn is None:
                continue
436

437
            try:
438
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
439
            except ConnectionClosed:
440
                continue
441

442 443 444 445 446 447 448 449 450 451
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
452
                              noid, dump(oid), cell.getAddress())
453 454 455 456 457
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
458
                              cell.getAddress(), dump(oid))
459 460 461 462 463 464 465
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
466
            # We didn't got any object from all storage node because of
467
            # connection error
468
            logging.warning('oid %s not found because of connection failure',
469
                    dump(oid))
470
            raise NEOStorageNotFoundError()
471

Aurel's avatar
Aurel committed
472
        if self.local_var.asked_object == -1:
473
            # We didn't got any object from all storage node
474
            logging.info('oid %s not found', dump(oid))
475
            raise NEOStorageNotFoundError()
476

477
        # Uncompress data
Aurel's avatar
Aurel committed
478
        if compression:
479
            data = decompress(data)
480

Aurel's avatar
Aurel committed
481 482
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
483
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
484
            try:
485
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
486
            finally:
Aurel's avatar
Aurel committed
487
                self._cache_lock_release()
488 489
        if data == '':
            data = None
490
        return data, start_serial, end_serial
491

492

493 494 495
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
496
        self._load_lock_acquire()
497
        try:
498 499 500 501
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
502 503
                    serial, data = self.mq_cache[oid]
                    return data, serial
504 505 506 507
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
508
        finally:
509
            self._load_lock_release()
Aurel's avatar
Aurel committed
510

511

512
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
513
        """Load an object for a given oid and serial."""
514 515
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
516
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
517

518

519
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
520
        """Load an object for a given oid before tid committed."""
521 522
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
523
        data, start, end = self._load(oid, tid=tid)
524 525 526 527 528
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
529

530

531 532 533
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
534
        if self.local_var.txn is transaction:
535
            # We already begin the same transaction
536
            return
537 538 539 540
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
541
        self._askPrimary(Packets.AskBeginTransaction(tid))
542 543
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
544
        self.local_var.txn = transaction
545

546

547 548
    def store(self, oid, serial, data, version, transaction,
        tryToResolveConflict):
549
        """Store object."""
550
        if transaction is not self.local_var.txn:
551
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
552
        logging.debug('storing oid %s serial %s',
553
                     dump(oid), dump(serial))
554
        # Find which storage node to use
555
        cell_list = self._getCellListForOID(oid, writable=True)
556
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
557
            raise NEOStorageError
558 559 560
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
561
        compressed_data = compress(data)
562
        if len(compressed_data) > len(data):
563 564 565 566
            compressed_data = data
            compression = 0
        else:
            compression = 1
567
        checksum = makeChecksum(compressed_data)
568
        p = Packets.AskStoreObject(oid, serial, compression,
569
                 checksum, compressed_data, self.local_var.tid)
570 571
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
572
        # Store data on each node
573 574 575
        self.local_var.object_stored_counter_dict[oid] = 0
        self.local_var.object_serial_dict[oid] = (serial, version)
        local_queue = self.local_var.queue
576
        for cell in cell_list:
577
            conn = self.cp.getConnForCell(cell)
578
            if conn is None:
579
                continue
580
            try:
581 582 583 584
                try:
                    conn.ask(local_queue, p)
                finally:
                    conn.unlock()
585
            except ConnectionClosed:
586
                continue
587

588 589
        self._waitAnyMessage(False)
        return None
590

591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
        for oid, conflict_serial in local_var.conflict_serial_dict.items():
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
                    # Forget this conflict
                    del local_var.conflict_serial_dict[oid]
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
                        local_var.txn, tryToResolveConflict)
                    append(oid)
                    resolved = True
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
620

621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        queue = self.local_var.queue
        tid = local_var.tid
        _waitAnyMessage = self._waitAnyMessage
        _handleConflicts = self._handleConflicts
        pending = self.dispatcher.pending
        while True:
            # Wait for all requests to be answered (or their connection to be
            # dected as closed)
            while pending(queue):
                _waitAnyMessage()
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
644

645 646 647 648 649 650 651 652 653 654
        # Check for never-stored objects, and update result for all others
        for oid, store_count in \
            local_var.object_stored_counter_dict.iteritems():
            if store_count == 0:
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
            else:
                append((oid, tid))
        return result
Aurel's avatar
Aurel committed
655

656
    def tpc_vote(self, transaction, tryToResolveConflict):
657
        """Store current transaction."""
658 659
        local_var = self.local_var
        if transaction is not local_var.txn:
660
            raise StorageTransactionError(self, transaction)
661 662 663

        result = self.waitStoreResponses(tryToResolveConflict)

664
        tid = local_var.tid
665
        # Store data on each node
666
        voted_counter = 0
667 668 669 670
        p = Packets.AskStoreTransaction(tid, transaction.user,
            transaction.description, dumps(transaction._extension),
            local_var.data_dict.keys())
        for cell in self._getCellListForTID(tid, writable=True):
671 672
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
673
            conn = self.cp.getConnForCell(cell)
674 675
            if conn is None:
                continue
676

677
            local_var.txn_voted = False
678 679
            try:
                self._askStorage(conn, p)
680
            except ConnectionClosed:
681
                continue
682

683
            if not self.isTransactionVoted():
684
                raise NEOStorageError('tpc_vote failed')
685
            voted_counter += 1
686 687

        # check at least one storage node accepted
688
        if voted_counter == 0:
689
            raise NEOStorageError('tpc_vote failed')
690 691 692 693 694
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
695

696 697
        return result

698 699
    def tpc_abort(self, transaction):
        """Abort current transaction."""
700
        if transaction is not self.local_var.txn:
701
            return
Aurel's avatar
Aurel committed
702

703
        cell_set = set()
704 705
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
706
            cell_set |= set(self._getCellListForOID(oid, writable=True))
707
        # select nodes where transaction was stored
708
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
709
            writable=True))
Aurel's avatar
Aurel committed
710

711 712
        # cancel transaction one all those nodes
        for cell in cell_set:
713
            conn = self.cp.getConnForCell(cell)
714 715 716
            if conn is None:
                continue
            try:
717
                conn.notify(Packets.AbortTransaction(self.local_var.tid))
718 719
            finally:
                conn.unlock()
720

721
        # Abort the transaction in the primary master node.
722
        conn = self._getMasterConnection()
723 724
        conn.lock()
        try:
725
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
726 727
        finally:
            conn.unlock()
728
        self.local_var.clear()
729

730 731
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
732
        if self.local_var.txn is not transaction:
733
            return
734
        self._load_lock_acquire()
735
        try:
736 737
            # Call function given by ZODB
            if f is not None:
738
                f(self.local_var.tid)
739 740

            # Call finish on master
741
            oid_list = self.local_var.data_dict.keys()
742
            p = Packets.AskFinishTransaction(oid_list, self.local_var.tid)
743
            self._askPrimary(p)
744

745
            if not self.isTransactionFinished():
746
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
747

748 749 750
            # Update cache
            self._cache_lock_acquire()
            try:
751 752
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
753
                    # Now serial is same as tid
754
                    self.mq_cache[oid] = self.local_var.tid, data
755 756
            finally:
                self._cache_lock_release()
757
            self.local_var.clear()
758
            return self.local_var.tid
759
        finally:
760
            self._load_lock_release()
761

762
    def undo(self, transaction_id, txn, tryToResolveConflict):
763
        if txn is not self.local_var.txn:
764
            raise StorageTransactionError(self, transaction_id)
765

766
        # First get transaction information from a storage node.
767
        cell_list = self._getCellListForTID(transaction_id, readable=True)
768 769
        shuffle(cell_list)
        for cell in cell_list:
770
            conn = self.cp.getConnForCell(cell)
771 772
            if conn is None:
                continue
773

774
            self.local_var.txn_info = 0
775
            try:
776 777
                self._askStorage(conn, Packets.AskTransactionInformation(
                    transaction_id))
778
            except ConnectionClosed:
779
                continue
780

781 782 783
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
784
            elif isinstance(self.local_var.txn_info, dict):
785 786 787 788
                break
            else:
                raise NEOStorageError('undo failed')

789
        if self.local_var.txn_info in (-1, 0):
790 791 792
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
793 794 795
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
796 797 798 799 800
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
801
            data, start, end = result
Aurel's avatar
Aurel committed
802 803 804
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
805
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
806
            data_dict[oid] = data
Aurel's avatar
Aurel committed
807

808
        # Third do transaction with old data
809 810
        oid_list = data_dict.keys()
        for oid in oid_list:
811 812 813
            self.store(oid, transaction_id, data_dict[oid], None, txn,
                tryToResolveConflict)
        self.waitStoreResponses(tryToResolveConflict)
814
        return self.local_var.tid, oid_list
815

816
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
817 818 819 820
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

821
        # First get a list of transactions from all storage nodes.
822 823
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
824
        pt = self._getPartitionTable()
825
        storage_node_list = pt.getNodeList()
826

827 828
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
829
            conn = self.cp.getConnForNode(storage_node)
830 831
            if conn is None:
                continue
832

833
            try:
834
                conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
835
                    protocol.INVALID_PARTITION))
836 837 838 839
            finally:
                conn.unlock()

        # Wait for answers from all storages.
840
        while len(self.local_var.node_tids) != len(storage_node_list):
841
            self._waitAnyMessage()
842 843

        # Reorder tids
844 845 846
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
847
            update(tid_list)
848
        ordered_tids = list(ordered_tids)
849
        ordered_tids.sort(reverse=True)
850
        logging.debug("UndoLog, tids %s", ordered_tids)
851 852
        # For each transaction, get info
        undo_info = []
853
        append = undo_info.append
854
        for tid in ordered_tids:
855
            cell_list = self._getCellListForTID(tid, readable=True)
856 857
            shuffle(cell_list)
            for cell in cell_list:
858
                conn = self.cp.getConnForCell(cell)
859 860 861
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
862
                        self._askStorage(conn,
863
                                Packets.AskTransactionInformation(tid))
864
                    except ConnectionClosed:
865 866 867
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
868

869
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
870
                # TID not found at all
871 872 873
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
874

875
            if filter is None or filter(self.local_var.txn_info):
876 877
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
878 879 880
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
881 882 883
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
884
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
885
                    block=1)
886 887
        return undo_info

888
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
889
        return self.__undoLog(first, last, filter, block)
890 891

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
892
        return self.__undoLog(first, last, with_oids=True)
893

Grégory Wisniewski's avatar
Grégory Wisniewski committed
894
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
895
        # Get history informations for object first
896
        cell_list = self._getCellListForOID(oid, readable=True)
897 898 899
        shuffle(cell_list)

        for cell in cell_list:
900
            conn = self.cp.getConnForCell(cell)
901 902
            if conn is None:
                continue
903

904
            self.local_var.history = None
905
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
906
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
907
            except ConnectionClosed:
908
                continue
909

910 911 912 913 914
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
915 916 917
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
918

919
        if not isinstance(self.local_var.history, tuple):
920 921
            raise NEOStorageError('history failed')

922 923 924 925 926
        if self.local_var.history[1] == []:
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

927 928 929 930
        if object_only:
            # Use by getSerial
            return self.local_var.history

931
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
932
        history_list = []
933
        for serial, size in self.local_var.history[1]:
934
            self._getCellListForTID(serial, readable=True)
935 936 937
            shuffle(cell_list)

            for cell in cell_list:
938
                conn = self.cp.getConnForCell(cell)
939 940
                if conn is None:
                    continue
941

942 943
                # ask transaction information
                self.local_var.txn_info = None
944
                try:
945
                    self._askStorage(conn,
946
                            Packets.AskTransactionInformation(serial))
947
                except ConnectionClosed:
948
                    continue
949

950 951
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
952
                    continue
953
                if isinstance(self.local_var.txn_info, dict):
954 955 956
                    break

            # create history dict
957 958
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
959
            self.local_var.txn_info['tid'] = serial
960
            self.local_var.txn_info['version'] = ''
961
            self.local_var.txn_info['size'] = size
962 963
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
964 965

        return history_list
Aurel's avatar
Aurel committed
966

967 968 969
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

970 971 972 973 974
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

975 976 977 978 979 980 981 982 983 984
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
985 986 987 988
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

989 990 991 992 993
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
994
            conn.close()
995 996
        # Stop polling thread
        self.poll_thread.stop()
997
    close = __del__
998 999

    def sync(self):
1000
        self._waitAnyMessage(False)
1001

1002 1003 1004 1005 1006 1007 1008 1009 1010
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1011
    def setTID(self, value):
1012
        self.local_var.tid = value
1013 1014

    def getTID(self):
1015
        return self.local_var.tid
1016 1017

    def setTransactionFinished(self):
1018
        self.local_var.txn_finished = True
1019 1020

    def isTransactionFinished(self):
1021
        return self.local_var.txn_finished
1022 1023

    def setTransactionVoted(self):
1024
        self.local_var.txn_voted = True
1025 1026

    def isTransactionVoted(self):
1027
        return self.local_var.txn_voted
1028