app.py 37.5 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27

28 29 30 31
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
32
from neo import protocol
33
from neo.protocol import NodeTypes, Packets
34
from neo.event import EventManager
35
from neo.util import makeChecksum, dump
36
from neo.locking import Lock
37
from neo.connection import MTClientConnection
38
from neo.node import NodeManager
39
from neo.connector import getConnectorHandler
40
from neo.client.exception import NEOStorageError
41
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
42 43
from neo.exception import NeoException
from neo.client.handlers import storage, master
44
from neo.dispatcher import Dispatcher
45
from neo.client.poll import ThreadedPoll
46 47
from neo.client.iterator import Iterator
from neo.client.mq import MQ
48
from neo.client.pool import ConnectionPool
49
from neo.util import u64, parseMasterList
50

Aurel's avatar
Aurel committed
51

52 53
class ThreadContext(object):

54
    def __init__(self):
55
        super(ThreadContext, self).__setattr__('_threads_dict', {})
56

57
    def __getThreadData(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
58
        thread_id = get_ident()
59
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
60
            result = self._threads_dict[thread_id]
61
        except KeyError:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
62 63
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
64 65 66 67 68 69 70 71
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
72

73 74 75 76
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

Grégory Wisniewski's avatar
Grégory Wisniewski committed
77 78 79 80
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
        self._threads_dict[thread_id] = {
81 82 83
            'tid': None,
            'txn': None,
            'data_dict': {},
84 85 86
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
87 88 89
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
90
            'queue': Queue(0),
91 92 93 94 95
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
96 97 98
        }


Aurel's avatar
Aurel committed
99
class Application(object):
100 101
    """The client node application."""

102
    def __init__(self, master_nodes, name, connector=None, **kw):
103
        # Start polling thread
104 105
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
106
        # Internal Attributes common to all thread
107
        self._db = None
Aurel's avatar
Aurel committed
108
        self.name = name
109
        self.connector_handler = getConnectorHandler(connector)
110
        self.dispatcher = Dispatcher()
111
        self.nm = NodeManager()
112
        self.cp = ConnectionPool(self)
113
        self.pt = None
114
        self.master_conn = None
115
        self.primary_master_node = None
116
        self.trying_master_node = None
117 118

        # load master node list
119
        for address in parseMasterList(master_nodes):
120
            self.nm.createMaster(address=address)
121

122
        # no self-assigned UUID, primary master will supply us one
123
        self.uuid = None
124
        self.mq_cache = MQ()
125
        self.new_oid_list = []
126
        self.last_oid = '\0' * 8
127
        self.storage_event_handler = storage.StorageEventHandler(self)
128
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
129
        self.storage_handler = storage.StorageAnswersHandler(self)
130 131
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
132
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
133
        # Internal attribute distinct between thread
134
        self.local_var = ThreadContext()
135
        # Lock definition :
136
        # _load_lock is used to make loading and storing atomic
137
        lock = Lock()
Aurel's avatar
Aurel committed
138 139
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
140 141
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
142 143
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
144
        self._oid_lock_release = lock.release
145
        lock = Lock()
146
        # _cache_lock is used for the client cache
147 148
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
149
        lock = Lock()
150 151
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
152 153
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
154 155 156 157
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
158

159 160 161 162 163 164 165 166 167 168 169
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
170 171
            # Guess the handler to use based on the type of node on the
            # connection
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
        handler.dispatch(conn, packet)

    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            try:
                conn, packet = get(block)
            except Empty:
197
                break
198 199 200
            if packet is None:
                # connection was closed
                continue
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
                    self._handlePacket(conn, packet, handler=handler)
                    break
            self._handlePacket(conn, packet)
221

222 223 224
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
225 226
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
227 228 229 230 231 232 233
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
234
        conn = self._getMasterConnection()
235 236
        conn.lock()
        try:
237 238
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
239 240 241 242
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

243 244
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
245
        # acquire the lock to allow only one thread to connect to the primary
246 247 248 249
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
250
                self.new_oid_list = []
251 252 253 254 255
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
256

257
    def _getPartitionTable(self):
258
        """ Return the partition table manager, reconnect the PMN if needed """
259 260 261 262 263
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

264 265
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
266
        pt = self._getPartitionTable()
267 268 269 270
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
271
        pt = self._getPartitionTable()
272
        return pt.getCellListForTID(tid, readable, writable)
273

274
    def _connectToPrimaryNode(self):
275
        logging.debug('connecting to primary master...')
276 277 278 279 280 281 282 283 284 285
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
286
                    self.primary_master_node = None
287 288
                else:
                    # Otherwise, check one by one.
289
                    master_list = nm.getMasterList()
290 291 292
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
293
                        sleep(1)
294 295 296 297
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
298
                conn = MTClientConnection(self.em, self.notifications_handler,
299 300 301
                        addr=self.trying_master_node.getAddress(),
                        connector_handler=self.connector_handler,
                        dispatcher=self.dispatcher)
302 303
                # Query for primary master node
                conn.lock()
304
                try:
305
                    if conn.getConnector() is None:
306
                        # This happens if a connection could not be established.
307 308 309
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
310
                    msg_id = conn.ask(self.local_var.queue,
311
                            Packets.AskPrimary())
312 313
                finally:
                    conn.unlock()
314
                try:
315
                    self._waitMessage(conn, msg_id,
316
                            handler=self.primary_bootstrap_handler)
317 318
                except ConnectionClosed:
                    continue
319
                # If we reached the primary master node, mark as connected
320 321
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
322

323
            logging.info('connected to a primary master node')
324
            # Identify to primary master and request initial data
325 326 327
            while conn.getUUID() is None:
                conn.lock()
                try:
328 329 330 331 332
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
333
                    p = Packets.RequestIdentification(NodeTypes.CLIENT,
334
                            self.uuid, None, self.name)
335
                    msg_id = conn.ask(self.local_var.queue, p)
336 337
                finally:
                    conn.unlock()
338
                try:
339
                    self._waitMessage(conn, msg_id,
340
                            handler=self.primary_bootstrap_handler)
341 342 343
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
344
                if conn.getUUID() is None:
345
                    # Node identification was refused by master.
346
                    sleep(1)
347
            if self.uuid is not None:
348 349
                conn.lock()
                try:
350
                    msg_id = conn.ask(self.local_var.queue,
351
                                      Packets.AskNodeInformation())
352 353
                finally:
                    conn.unlock()
354
                self._waitMessage(conn, msg_id,
355
                        handler=self.primary_bootstrap_handler)
356 357
                conn.lock()
                try:
358
                    msg_id = conn.ask(self.local_var.queue,
359
                                      Packets.AskPartitionTable([]))
360 361
                finally:
                    conn.unlock()
362
                self._waitMessage(conn, msg_id,
363
                        handler=self.primary_bootstrap_handler)
364
            ready = self.uuid is not None and self.pt is not None \
365
                                 and self.pt.operational()
366
        logging.info("connected to primary master node %s" %
367
                self.primary_master_node)
368
        return conn
369

370 371 372
    def registerDB(self, db, limit):
        self._db = db

373 374 375
    def getDB(self):
        return self._db

376 377 378 379 380
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
381 382 383 384
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
385
                self._askPrimary(Packets.AskNewOIDs(100))
386 387
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
388 389
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
390 391 392
        finally:
            self._oid_lock_release()

393 394 395
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
396

Aurel's avatar
Aurel committed
397 398 399 400
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
401 402
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
403 404 405
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
406
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
407 408
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
409 410 411 412
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
413

414
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
415
        """Internal method which manage load ,loadSerial and loadBefore."""
416
        cell_list = self._getCellListForOID(oid, readable=True)
417 418
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
419 420
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
421 422 423 424 425 426 427
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
428
            conn = self.cp.getConnForCell(cell)
429 430
            if conn is None:
                continue
431

432
            try:
433
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
434
            except ConnectionClosed:
435
                continue
436

437 438 439 440 441 442 443 444 445 446
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
447
                              noid, dump(oid), cell.getAddress())
448 449 450 451 452
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
453
                              cell.getAddress(), dump(oid))
454 455 456 457 458 459 460
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
461
            # We didn't got any object from all storage node because of
462
            # connection error
463
            logging.warning('oid %s not found because of connection failure',
464
                    dump(oid))
465
            raise NEOStorageNotFoundError()
466

Aurel's avatar
Aurel committed
467
        if self.local_var.asked_object == -1:
468
            # We didn't got any object from all storage node
469
            logging.info('oid %s not found', dump(oid))
470
            raise NEOStorageNotFoundError()
471

472
        # Uncompress data
Aurel's avatar
Aurel committed
473
        if compression:
474
            data = decompress(data)
475

Aurel's avatar
Aurel committed
476 477
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
478
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
479
            try:
480
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
481
            finally:
Aurel's avatar
Aurel committed
482
                self._cache_lock_release()
483 484
        if data == '':
            data = None
485
        return data, start_serial, end_serial
486

487

488 489 490
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
Aurel's avatar
Aurel committed
491
        self._load_lock_acquire()
492
        try:
Aurel's avatar
Aurel committed
493 494 495 496
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
497 498
                    serial, data = self.mq_cache[oid]
                    return data, serial
Aurel's avatar
Aurel committed
499 500 501 502
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
503
        finally:
Aurel's avatar
Aurel committed
504
            self._load_lock_release()
Aurel's avatar
Aurel committed
505

506

507
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
508
        """Load an object for a given oid and serial."""
509 510
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
511
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
512

513

514
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
515
        """Load an object for a given oid before tid committed."""
516 517
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
518
        data, start, end = self._load(oid, tid=tid)
519 520 521 522 523
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
524

525

526 527 528
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
529
        if self.local_var.txn is transaction:
530
            # We already begin the same transaction
531
            return
532 533 534 535
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
536
        self._askPrimary(Packets.AskBeginTransaction(tid))
537 538
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
539
        self.local_var.txn = transaction
540

541

542 543
    def store(self, oid, serial, data, version, transaction,
        tryToResolveConflict):
544
        """Store object."""
545
        if transaction is not self.local_var.txn:
546
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
547
        logging.debug('storing oid %s serial %s',
548
                     dump(oid), dump(serial))
549
        # Find which storage node to use
550
        cell_list = self._getCellListForOID(oid, writable=True)
551
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
552
            raise NEOStorageError
553 554 555
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
556
        compressed_data = compress(data)
557
        checksum = makeChecksum(compressed_data)
558 559
        p = Packets.AskStoreObject(oid, serial, 1,
                 checksum, compressed_data, self.local_var.tid)
560 561
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
562
        # Store data on each node
563 564 565
        self.local_var.object_stored_counter_dict[oid] = 0
        self.local_var.object_serial_dict[oid] = (serial, version)
        local_queue = self.local_var.queue
566
        for cell in cell_list:
567
            conn = self.cp.getConnForCell(cell)
568
            if conn is None:
569
                continue
570
            try:
571 572 573 574
                try:
                    conn.ask(local_queue, p)
                finally:
                    conn.unlock()
575
            except ConnectionClosed:
576
                continue
577

578 579
        self._waitAnyMessage(False)
        return None
580

581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
        for oid, conflict_serial in local_var.conflict_serial_dict.items():
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
                    # Forget this conflict
                    del local_var.conflict_serial_dict[oid]
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
                        local_var.txn, tryToResolveConflict)
                    append(oid)
                    resolved = True
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
610

611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        queue = self.local_var.queue
        tid = local_var.tid
        _waitAnyMessage = self._waitAnyMessage
        _handleConflicts = self._handleConflicts
        pending = self.dispatcher.pending
        while True:
            # Wait for all requests to be answered (or their connection to be
            # dected as closed)
            while pending(queue):
                _waitAnyMessage()
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
634

635 636 637 638 639 640 641 642 643 644
        # Check for never-stored objects, and update result for all others
        for oid, store_count in \
            local_var.object_stored_counter_dict.iteritems():
            if store_count == 0:
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
            else:
                append((oid, tid))
        return result
Aurel's avatar
Aurel committed
645

646
    def tpc_vote(self, transaction, tryToResolveConflict):
647
        """Store current transaction."""
648 649
        local_var = self.local_var
        if transaction is not local_var.txn:
650
            raise StorageTransactionError(self, transaction)
651 652 653

        result = self.waitStoreResponses(tryToResolveConflict)

654
        tid = local_var.tid
655
        # Store data on each node
656
        voted_counter = 0
657 658 659 660
        p = Packets.AskStoreTransaction(tid, transaction.user,
            transaction.description, dumps(transaction._extension),
            local_var.data_dict.keys())
        for cell in self._getCellListForTID(tid, writable=True):
661 662
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
663
            conn = self.cp.getConnForCell(cell)
664 665
            if conn is None:
                continue
666

667
            local_var.txn_voted = False
668 669
            try:
                self._askStorage(conn, p)
670
            except ConnectionClosed:
671
                continue
672

673
            if not self.isTransactionVoted():
674
                raise NEOStorageError('tpc_vote failed')
675
            voted_counter += 1
676 677

        # check at least one storage node accepted
678
        if voted_counter == 0:
679
            raise NEOStorageError('tpc_vote failed')
680 681 682 683 684
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
685

686 687
        return result

688 689
    def tpc_abort(self, transaction):
        """Abort current transaction."""
690
        if transaction is not self.local_var.txn:
691
            return
Aurel's avatar
Aurel committed
692

693
        cell_set = set()
694 695
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
696
            cell_set |= set(self._getCellListForOID(oid, writable=True))
697
        # select nodes where transaction was stored
698
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
699
            writable=True))
Aurel's avatar
Aurel committed
700

701 702
        # cancel transaction one all those nodes
        for cell in cell_set:
703
            conn = self.cp.getConnForCell(cell)
704 705 706
            if conn is None:
                continue
            try:
707
                conn.notify(Packets.AbortTransaction(self.local_var.tid))
708 709
            finally:
                conn.unlock()
710

711
        # Abort the transaction in the primary master node.
712
        conn = self._getMasterConnection()
713 714
        conn.lock()
        try:
715
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
716 717
        finally:
            conn.unlock()
718
        self.local_var.clear()
719

720 721
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
722
        if self.local_var.txn is not transaction:
723
            return
Aurel's avatar
Aurel committed
724
        self._load_lock_acquire()
725
        try:
Aurel's avatar
Aurel committed
726 727
            # Call function given by ZODB
            if f is not None:
728
                f(self.local_var.tid)
Aurel's avatar
Aurel committed
729 730

            # Call finish on master
731
            oid_list = self.local_var.data_dict.keys()
732
            p = Packets.AskFinishTransaction(oid_list, self.local_var.tid)
733
            self._askPrimary(p)
734

735
            if not self.isTransactionFinished():
Aurel's avatar
Aurel committed
736
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
737

Aurel's avatar
Aurel committed
738 739 740
            # Update cache
            self._cache_lock_acquire()
            try:
741 742
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
Aurel's avatar
Aurel committed
743
                    # Now serial is same as tid
744
                    self.mq_cache[oid] = self.local_var.tid, data
Aurel's avatar
Aurel committed
745 746
            finally:
                self._cache_lock_release()
747
            self.local_var.clear()
748
            return self.local_var.tid
749
        finally:
Aurel's avatar
Aurel committed
750
            self._load_lock_release()
751

752
    def undo(self, transaction_id, txn, tryToResolveConflict):
753
        if txn is not self.local_var.txn:
754
            raise StorageTransactionError(self, transaction_id)
755

756
        # First get transaction information from a storage node.
757
        cell_list = self._getCellListForTID(transaction_id, writable=True)
758 759
        shuffle(cell_list)
        for cell in cell_list:
760
            conn = self.cp.getConnForCell(cell)
761 762
            if conn is None:
                continue
763

764
            self.local_var.txn_info = 0
765
            try:
766 767
                self._askStorage(conn, Packets.AskTransactionInformation(
                    transaction_id))
768
            except ConnectionClosed:
769
                continue
770

771 772 773
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
774
            elif isinstance(self.local_var.txn_info, dict):
775 776 777 778
                break
            else:
                raise NEOStorageError('undo failed')

779
        if self.local_var.txn_info in (-1, 0):
780 781 782
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
783 784 785
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
786 787 788 789 790
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
791
            data, start, end = result
Aurel's avatar
Aurel committed
792 793 794
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
795
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
796
            data_dict[oid] = data
Aurel's avatar
Aurel committed
797

798
        # Third do transaction with old data
799 800
        oid_list = data_dict.keys()
        for oid in oid_list:
801 802 803
            self.store(oid, transaction_id, data_dict[oid], None, txn,
                tryToResolveConflict)
        self.waitStoreResponses(tryToResolveConflict)
804
        return self.local_var.tid, oid_list
805

806
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
807 808 809 810
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

811
        # First get a list of transactions from all storage nodes.
812 813
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
814
        pt = self._getPartitionTable()
815
        storage_node_list = pt.getNodeList()
816

817 818
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
819
            conn = self.cp.getConnForNode(storage_node)
820 821
            if conn is None:
                continue
822

823
            try:
824
                conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
825
                    protocol.INVALID_PARTITION))
826 827 828 829
            finally:
                conn.unlock()

        # Wait for answers from all storages.
830
        while len(self.local_var.node_tids) != len(storage_node_list):
831
            self._waitAnyMessage()
832 833

        # Reorder tids
834 835 836
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
837
            update(tid_list)
838
        ordered_tids = list(ordered_tids)
839
        ordered_tids.sort(reverse=True)
840
        logging.debug("UndoLog, tids %s", ordered_tids)
841 842
        # For each transaction, get info
        undo_info = []
843
        append = undo_info.append
844
        for tid in ordered_tids:
845
            cell_list = self._getCellListForTID(tid, readable=True)
846 847
            shuffle(cell_list)
            for cell in cell_list:
848
                conn = self.cp.getConnForCell(cell)
849 850 851
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
852
                        self._askStorage(conn,
853
                                Packets.AskTransactionInformation(tid))
854
                    except ConnectionClosed:
855 856 857
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
858

859
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
860
                # TID not found at all
861 862 863
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
864

865
            if filter is None or filter(self.local_var.txn_info):
866 867
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
868 869 870
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
871 872 873
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
874
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
875
                    block=1)
876 877
        return undo_info

878
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
879
        return self.__undoLog(first, last, filter, block)
880 881

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
882
        return self.__undoLog(first, last, with_oids=True)
883

Grégory Wisniewski's avatar
Grégory Wisniewski committed
884
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
885
        # Get history informations for object first
886
        cell_list = self._getCellListForOID(oid, readable=True)
887 888 889
        shuffle(cell_list)

        for cell in cell_list:
890
            conn = self.cp.getConnForCell(cell)
891 892
            if conn is None:
                continue
893

894
            self.local_var.history = None
895
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
896
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
897
            except ConnectionClosed:
898
                continue
899

900 901 902 903 904
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
905 906 907
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
908

909
        if not isinstance(self.local_var.history, tuple):
910 911
            raise NEOStorageError('history failed')

912 913 914 915 916
        if self.local_var.history[1] == []:
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

917 918 919 920
        if object_only:
            # Use by getSerial
            return self.local_var.history

921
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
922
        history_list = []
923
        for serial, size in self.local_var.history[1]:
924
            self._getCellListForTID(serial, readable=True)
925 926 927
            shuffle(cell_list)

            for cell in cell_list:
928
                conn = self.cp.getConnForCell(cell)
929 930
                if conn is None:
                    continue
931

932 933
                # ask transaction information
                self.local_var.txn_info = None
934
                try:
935
                    self._askStorage(conn,
936
                            Packets.AskTransactionInformation(serial))
937
                except ConnectionClosed:
938
                    continue
939

940 941
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
942
                    continue
943
                if isinstance(self.local_var.txn_info, dict):
944 945 946
                    break

            # create history dict
947 948
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
949
            self.local_var.txn_info['tid'] = serial
950
            self.local_var.txn_info['version'] = ''
951
            self.local_var.txn_info['size'] = size
952 953
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
954 955

        return history_list
Aurel's avatar
Aurel committed
956

957 958 959
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

960 961 962 963 964
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

965 966 967 968 969 970 971 972 973 974
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

975 976 977 978 979
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
980
            conn.close()
981 982
        # Stop polling thread
        self.poll_thread.stop()
983
    close = __del__
984 985

    def sync(self):
986
        self._waitAnyMessage(False)
987

988 989 990 991 992 993 994 995 996
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

997
    def setTID(self, value):
998
        self.local_var.tid = value
999 1000

    def getTID(self):
1001
        return self.local_var.tid
1002 1003

    def setTransactionFinished(self):
1004
        self.local_var.txn_finished = True
1005 1006

    def isTransactionFinished(self):
1007
        return self.local_var.txn_finished
1008 1009

    def setTransactionVoted(self):
1010
        self.local_var.txn_voted = True
1011 1012

    def isTransactionVoted(self):
1013
        return self.local_var.txn_voted
1014