app.py 38.6 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27

28 29 30 31
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
32
from neo import protocol
33
from neo.protocol import NodeTypes, Packets
34
from neo.event import EventManager
35
from neo.util import makeChecksum, dump
36
from neo.locking import Lock
37
from neo.connection import MTClientConnection
38
from neo.node import NodeManager
39
from neo.connector import getConnectorHandler
40
from neo.client.exception import NEOStorageError
41
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
42 43
from neo.exception import NeoException
from neo.client.handlers import storage, master
44
from neo.dispatcher import Dispatcher
45
from neo.client.poll import ThreadedPoll
46 47
from neo.client.iterator import Iterator
from neo.client.mq import MQ
48
from neo.client.pool import ConnectionPool
49
from neo.util import u64, parseMasterList
50

Aurel's avatar
Aurel committed
51

52 53
class ThreadContext(object):

54
    def __init__(self):
55
        super(ThreadContext, self).__setattr__('_threads_dict', {})
56

57
    def __getThreadData(self):
58
        thread_id = get_ident()
59
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
60
            result = self._threads_dict[thread_id]
61
        except KeyError:
62 63
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
64 65 66 67 68 69 70 71
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
72

73 74 75 76
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

77 78 79
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
80 81 82 83 84
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
85
        self._threads_dict[thread_id] = {
86 87 88
            'tid': None,
            'txn': None,
            'data_dict': {},
89 90 91
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
92 93 94
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
95
            'queue': queue,
96 97 98 99 100
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
101 102 103
        }


Aurel's avatar
Aurel committed
104
class Application(object):
105 106
    """The client node application."""

107
    def __init__(self, master_nodes, name, connector=None, **kw):
108
        # Start polling thread
109 110
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
111
        # Internal Attributes common to all thread
112
        self._db = None
Aurel's avatar
Aurel committed
113
        self.name = name
114
        self.connector_handler = getConnectorHandler(connector)
115
        self.dispatcher = Dispatcher()
116
        self.nm = NodeManager()
117
        self.cp = ConnectionPool(self)
118
        self.pt = None
119
        self.master_conn = None
120
        self.primary_master_node = None
121
        self.trying_master_node = None
122 123

        # load master node list
124
        for address in parseMasterList(master_nodes):
125
            self.nm.createMaster(address=address)
126

127
        # no self-assigned UUID, primary master will supply us one
128
        self.uuid = None
129
        self.mq_cache = MQ()
130
        self.new_oid_list = []
131
        self.last_oid = '\0' * 8
132
        self.storage_event_handler = storage.StorageEventHandler(self)
133
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
134
        self.storage_handler = storage.StorageAnswersHandler(self)
135 136
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
137
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
138
        # Internal attribute distinct between thread
139
        self.local_var = ThreadContext()
140
        # Lock definition :
141
        # _load_lock is used to make loading and storing atomic
142
        lock = Lock()
143 144
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
145 146
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
147 148
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
149
        self._oid_lock_release = lock.release
150
        lock = Lock()
151
        # _cache_lock is used for the client cache
152 153
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
154
        lock = Lock()
155 156
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
157 158
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
159 160 161 162
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
163

164 165 166 167 168 169 170 171 172 173 174
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
175 176
            # Guess the handler to use based on the type of node on the
            # connection
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
        handler.dispatch(conn, packet)

    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            try:
                conn, packet = get(block)
            except Empty:
202
                break
203 204 205
            if packet is None:
                # connection was closed
                continue
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
                    self._handlePacket(conn, packet, handler=handler)
                    break
225 226
            elif packet is not None:
                self._handlePacket(conn, packet)
227

228 229 230
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
231 232
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
233 234 235 236 237 238 239
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
240
        conn = self._getMasterConnection()
241 242
        conn.lock()
        try:
243 244
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
245 246 247 248
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

249 250
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
251
        # acquire the lock to allow only one thread to connect to the primary
252 253 254 255
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
256
                self.new_oid_list = []
257 258 259 260 261
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
262

263
    def _getPartitionTable(self):
264
        """ Return the partition table manager, reconnect the PMN if needed """
265 266 267 268 269
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

270 271
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
272
        pt = self._getPartitionTable()
273 274 275 276
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
277
        pt = self._getPartitionTable()
278
        return pt.getCellListForTID(tid, readable, writable)
279

280
    def _connectToPrimaryNode(self):
281
        logging.debug('connecting to primary master...')
282 283 284 285 286 287 288 289 290 291
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
292
                    self.primary_master_node = None
293 294
                else:
                    # Otherwise, check one by one.
295
                    master_list = nm.getMasterList()
296 297 298
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
299
                        sleep(1)
300 301 302 303
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
304
                conn = MTClientConnection(self.em, self.notifications_handler,
305 306 307
                        addr=self.trying_master_node.getAddress(),
                        connector_handler=self.connector_handler,
                        dispatcher=self.dispatcher)
308 309
                # Query for primary master node
                conn.lock()
310
                try:
311
                    if conn.getConnector() is None:
312
                        # This happens if a connection could not be established.
313 314 315
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
316
                    msg_id = conn.ask(self.local_var.queue,
317
                            Packets.AskPrimary())
318 319
                finally:
                    conn.unlock()
320
                try:
321
                    self._waitMessage(conn, msg_id,
322
                            handler=self.primary_bootstrap_handler)
323 324
                except ConnectionClosed:
                    continue
325
                # If we reached the primary master node, mark as connected
326 327
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
328

329
            logging.info('connected to a primary master node')
330
            # Identify to primary master and request initial data
331 332 333
            while conn.getUUID() is None:
                conn.lock()
                try:
334 335 336 337 338
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
339
                    p = Packets.RequestIdentification(NodeTypes.CLIENT,
340
                            self.uuid, None, self.name)
341
                    msg_id = conn.ask(self.local_var.queue, p)
342 343
                finally:
                    conn.unlock()
344
                try:
345
                    self._waitMessage(conn, msg_id,
346
                            handler=self.primary_bootstrap_handler)
347 348 349
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
350
                if conn.getUUID() is None:
351
                    # Node identification was refused by master.
352
                    sleep(1)
353
            if self.uuid is not None:
354 355
                conn.lock()
                try:
356
                    msg_id = conn.ask(self.local_var.queue,
357
                                      Packets.AskNodeInformation())
358 359
                finally:
                    conn.unlock()
360
                self._waitMessage(conn, msg_id,
361
                        handler=self.primary_bootstrap_handler)
362 363
                conn.lock()
                try:
364
                    msg_id = conn.ask(self.local_var.queue,
365
                                      Packets.AskPartitionTable([]))
366 367
                finally:
                    conn.unlock()
368
                self._waitMessage(conn, msg_id,
369
                        handler=self.primary_bootstrap_handler)
370
            ready = self.uuid is not None and self.pt is not None \
371
                                 and self.pt.operational()
372
        logging.info("connected to primary master node %s" %
373
                self.primary_master_node)
374
        return conn
375

376 377 378
    def registerDB(self, db, limit):
        self._db = db

379 380 381
    def getDB(self):
        return self._db

382 383 384 385 386
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
387 388 389 390
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
391
                self._askPrimary(Packets.AskNewOIDs(100))
392 393
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
394 395
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
396 397 398
        finally:
            self._oid_lock_release()

399 400 401
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
402

Aurel's avatar
Aurel committed
403 404 405 406
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
407 408
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
409 410 411
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
412
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
413 414
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
415 416 417 418
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
419

420
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
421
        """Internal method which manage load ,loadSerial and loadBefore."""
422
        cell_list = self._getCellListForOID(oid, readable=True)
423 424
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
425 426
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
427 428 429 430 431 432 433
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
434
            conn = self.cp.getConnForCell(cell)
435 436
            if conn is None:
                continue
437

438
            try:
439
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
440
            except ConnectionClosed:
441
                continue
442

443 444 445 446 447 448 449 450 451 452
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
453
                              noid, dump(oid), cell.getAddress())
454 455 456 457 458
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
459
                              cell.getAddress(), dump(oid))
460 461 462 463 464 465 466
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
467
            # We didn't got any object from all storage node because of
468
            # connection error
469
            logging.warning('oid %s not found because of connection failure',
470
                    dump(oid))
471
            raise NEOStorageNotFoundError()
472

Aurel's avatar
Aurel committed
473
        if self.local_var.asked_object == -1:
474
            # We didn't got any object from all storage node
475
            logging.info('oid %s not found', dump(oid))
476
            raise NEOStorageNotFoundError()
477

478
        # Uncompress data
Aurel's avatar
Aurel committed
479
        if compression:
480
            data = decompress(data)
481

Aurel's avatar
Aurel committed
482 483
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
484
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
485
            try:
486
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
487
            finally:
Aurel's avatar
Aurel committed
488
                self._cache_lock_release()
489 490
        if data == '':
            data = None
491
        return data, start_serial, end_serial
492

493

494 495 496
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
497
        self._load_lock_acquire()
498
        try:
499 500 501 502
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
503 504
                    serial, data = self.mq_cache[oid]
                    return data, serial
505 506 507 508
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
509
        finally:
510
            self._load_lock_release()
Aurel's avatar
Aurel committed
511

512

513
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
514
        """Load an object for a given oid and serial."""
515 516
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
517
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
518

519

520
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
521
        """Load an object for a given oid before tid committed."""
522 523
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
524
        data, start, end = self._load(oid, tid=tid)
525 526 527 528 529
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
530

531

532 533 534
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
535
        if self.local_var.txn is transaction:
536
            # We already begin the same transaction
537
            return
538 539 540 541
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
542
        self._askPrimary(Packets.AskBeginTransaction(tid))
543 544
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
545
        self.local_var.txn = transaction
546

547

Vincent Pelletier's avatar
Vincent Pelletier committed
548
    def store(self, oid, serial, data, version, transaction):
549
        """Store object."""
550
        if transaction is not self.local_var.txn:
551
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
552
        logging.debug('storing oid %s serial %s',
553
                     dump(oid), dump(serial))
554
        # Find which storage node to use
555
        cell_list = self._getCellListForOID(oid, writable=True)
556
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
557
            raise NEOStorageError
558 559 560
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
561
        compressed_data = compress(data)
562
        if len(compressed_data) > len(data):
563 564 565 566
            compressed_data = data
            compression = 0
        else:
            compression = 1
567
        checksum = makeChecksum(compressed_data)
568
        p = Packets.AskStoreObject(oid, serial, compression,
569
                 checksum, compressed_data, self.local_var.tid)
570 571
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
572
        # Store data on each node
573 574 575
        self.local_var.object_stored_counter_dict[oid] = 0
        self.local_var.object_serial_dict[oid] = (serial, version)
        local_queue = self.local_var.queue
576
        for cell in cell_list:
577
            conn = self.cp.getConnForCell(cell)
578
            if conn is None:
579
                continue
580
            try:
581 582 583 584
                try:
                    conn.ask(local_queue, p)
                finally:
                    conn.unlock()
585
            except ConnectionClosed:
586
                continue
587

588 589
        self._waitAnyMessage(False)
        return None
590

591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
        for oid, conflict_serial in local_var.conflict_serial_dict.items():
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
                    # Forget this conflict
                    del local_var.conflict_serial_dict[oid]
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
Vincent Pelletier's avatar
Vincent Pelletier committed
611
                        local_var.txn)
612 613 614 615 616 617 618 619
                    append(oid)
                    resolved = True
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
620

621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        queue = self.local_var.queue
        tid = local_var.tid
        _waitAnyMessage = self._waitAnyMessage
        _handleConflicts = self._handleConflicts
        pending = self.dispatcher.pending
        while True:
            # Wait for all requests to be answered (or their connection to be
            # dected as closed)
            while pending(queue):
                _waitAnyMessage()
637 638
            if tryToResolveConflict is None:
                break
639 640 641 642 643 644 645
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
646

647 648 649 650 651 652 653 654 655 656
        if tryToResolveConflict is not None:
            # Check for never-stored objects, and update result for all others
            for oid, store_count in \
                local_var.object_stored_counter_dict.iteritems():
                if store_count == 0:
                    raise NEOStorageError('tpc_store failed')
                elif oid in resolved_oid_set:
                    append((oid, ResolvedSerial))
                else:
                    append((oid, tid))
657
        return result
Aurel's avatar
Aurel committed
658

659
    def tpc_vote(self, transaction, tryToResolveConflict):
660
        """Store current transaction."""
661 662
        local_var = self.local_var
        if transaction is not local_var.txn:
663
            raise StorageTransactionError(self, transaction)
664 665 666

        result = self.waitStoreResponses(tryToResolveConflict)

667
        tid = local_var.tid
668
        # Store data on each node
669
        voted_counter = 0
670 671 672 673
        p = Packets.AskStoreTransaction(tid, transaction.user,
            transaction.description, dumps(transaction._extension),
            local_var.data_dict.keys())
        for cell in self._getCellListForTID(tid, writable=True):
674 675
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
676
            conn = self.cp.getConnForCell(cell)
677 678
            if conn is None:
                continue
679

680
            local_var.txn_voted = False
681 682
            try:
                self._askStorage(conn, p)
683
            except ConnectionClosed:
684
                continue
685

686
            if not self.isTransactionVoted():
687
                raise NEOStorageError('tpc_vote failed')
688
            voted_counter += 1
689 690

        # check at least one storage node accepted
691
        if voted_counter == 0:
692
            raise NEOStorageError('tpc_vote failed')
693 694 695 696 697
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
698

699 700
        return result

701 702
    def tpc_abort(self, transaction):
        """Abort current transaction."""
703
        if transaction is not self.local_var.txn:
704
            return
Aurel's avatar
Aurel committed
705

706 707 708 709
        # Just wait for response to arrive, don't handle any conflict, and
        # ignore the outcome: we are going to abort anyway.
        self.waitStoreResponses(None)

710
        cell_set = set()
711 712
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
713
            cell_set |= set(self._getCellListForOID(oid, writable=True))
714
        # select nodes where transaction was stored
715
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
716
            writable=True))
Aurel's avatar
Aurel committed
717

718 719
        # cancel transaction one all those nodes
        for cell in cell_set:
720
            conn = self.cp.getConnForCell(cell)
721 722 723
            if conn is None:
                continue
            try:
724
                conn.notify(Packets.AbortTransaction(self.local_var.tid))
725 726
            finally:
                conn.unlock()
727

728
        # Abort the transaction in the primary master node.
729
        conn = self._getMasterConnection()
730 731
        conn.lock()
        try:
732
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
733 734
        finally:
            conn.unlock()
735
        self.local_var.clear()
736

737 738
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
739
        if self.local_var.txn is not transaction:
740
            return
741
        self._load_lock_acquire()
742
        try:
743 744
            # Call function given by ZODB
            if f is not None:
745
                f(self.local_var.tid)
746 747

            # Call finish on master
748
            oid_list = self.local_var.data_dict.keys()
749
            p = Packets.AskFinishTransaction(oid_list, self.local_var.tid)
750
            self._askPrimary(p)
751

752
            if not self.isTransactionFinished():
753
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
754

755 756 757
            # Update cache
            self._cache_lock_acquire()
            try:
758 759
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
760 761 762 763 764
                    if data == '':
                        del self.mq_cache[oid]
                    else:
                        # Now serial is same as tid
                        self.mq_cache[oid] = self.local_var.tid, data
765 766
            finally:
                self._cache_lock_release()
767
            self.local_var.clear()
768
            return self.local_var.tid
769
        finally:
770
            self._load_lock_release()
771

772
    def undo(self, transaction_id, txn, tryToResolveConflict):
773
        if txn is not self.local_var.txn:
774
            raise StorageTransactionError(self, transaction_id)
775

776
        # First get transaction information from a storage node.
777
        cell_list = self._getCellListForTID(transaction_id, readable=True)
778 779
        assert len(cell_list), 'No cell found for transaction %s' % (
            dump(tid), )
780 781
        shuffle(cell_list)
        for cell in cell_list:
782
            conn = self.cp.getConnForCell(cell)
783 784
            if conn is None:
                continue
785

786
            self.local_var.txn_info = 0
787
            try:
788 789
                self._askStorage(conn, Packets.AskTransactionInformation(
                    transaction_id))
790
            except ConnectionClosed:
791
                continue
792

793 794
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
795 796
                logging.warning('Transaction %s was not found on node %s',
                    dump(tid), self.nm.getByAddress(conn.getAddress()))
797
                continue
798
            elif isinstance(self.local_var.txn_info, dict):
799 800 801
                break
            else:
                raise NEOStorageError('undo failed')
802
        else:
803 804 805
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
806 807 808
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
809 810 811
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
812 813 814 815 816 817
                if oid == '\x00' * 8:
                    # Refuse undoing root object creation.
                    raise UndoError("no previous record", oid)
                else:
                    # Undo object creation
                    result = ('', None, transaction_id)
818
            data, start, end = result
Aurel's avatar
Aurel committed
819 820 821
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
822
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
823
            data_dict[oid] = data
Aurel's avatar
Aurel committed
824

825
        # Third do transaction with old data
Vincent Pelletier's avatar
Vincent Pelletier committed
826
        for oid, data in data_dict.iteritems():
Vincent Pelletier's avatar
Vincent Pelletier committed
827
            self.store(oid, transaction_id, data, None, txn)
828
        self.waitStoreResponses(tryToResolveConflict)
829
        return self.local_var.tid, oid_list
830

831
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
832 833 834 835
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

836
        # First get a list of transactions from all storage nodes.
837 838
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
839
        pt = self._getPartitionTable()
840
        storage_node_list = pt.getNodeList()
841

842 843
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
844
            conn = self.cp.getConnForNode(storage_node)
845 846
            if conn is None:
                continue
847

848
            try:
849
                conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
850
                    protocol.INVALID_PARTITION))
851 852 853 854
            finally:
                conn.unlock()

        # Wait for answers from all storages.
855
        while len(self.local_var.node_tids) != len(storage_node_list):
856
            self._waitAnyMessage()
857 858

        # Reorder tids
859 860 861
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
862
            update(tid_list)
863
        ordered_tids = list(ordered_tids)
864
        ordered_tids.sort(reverse=True)
865
        logging.debug("UndoLog, tids %s", ordered_tids)
866 867
        # For each transaction, get info
        undo_info = []
868
        append = undo_info.append
869
        for tid in ordered_tids:
870
            cell_list = self._getCellListForTID(tid, readable=True)
871 872
            shuffle(cell_list)
            for cell in cell_list:
873
                conn = self.cp.getConnForCell(cell)
874 875 876
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
877
                        self._askStorage(conn,
878
                                Packets.AskTransactionInformation(tid))
879
                    except ConnectionClosed:
880 881 882
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
883

884
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
885
                # TID not found at all
886 887 888
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
889

890
            if filter is None or filter(self.local_var.txn_info):
891 892
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
893 894 895
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
896 897 898
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
899
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
900
                    block=1)
901 902
        return undo_info

903
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
904
        return self.__undoLog(first, last, filter, block)
905 906

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
907
        return self.__undoLog(first, last, with_oids=True)
908

Grégory Wisniewski's avatar
Grégory Wisniewski committed
909
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
910
        # Get history informations for object first
911
        cell_list = self._getCellListForOID(oid, readable=True)
912 913 914
        shuffle(cell_list)

        for cell in cell_list:
915
            conn = self.cp.getConnForCell(cell)
916 917
            if conn is None:
                continue
918

919
            self.local_var.history = None
920
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
921
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
922
            except ConnectionClosed:
923
                continue
924

925 926 927 928 929
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
930 931 932
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
933

934
        if not isinstance(self.local_var.history, tuple):
935 936
            raise NEOStorageError('history failed')

937 938
        if self.local_var.history[1] == [] or \
            self.local_var.history[1][0][1] == 0:
939 940 941 942
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

943 944 945 946
        if object_only:
            # Use by getSerial
            return self.local_var.history

947
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
948
        history_list = []
949
        for serial, size in self.local_var.history[1]:
950
            self._getCellListForTID(serial, readable=True)
951 952 953
            shuffle(cell_list)

            for cell in cell_list:
954
                conn = self.cp.getConnForCell(cell)
955 956
                if conn is None:
                    continue
957

958 959
                # ask transaction information
                self.local_var.txn_info = None
960
                try:
961
                    self._askStorage(conn,
962
                            Packets.AskTransactionInformation(serial))
963
                except ConnectionClosed:
964
                    continue
965

966 967
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
968
                    continue
969
                if isinstance(self.local_var.txn_info, dict):
970 971 972
                    break

            # create history dict
973 974
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
975
            self.local_var.txn_info['tid'] = serial
976
            self.local_var.txn_info['version'] = ''
977
            self.local_var.txn_info['size'] = size
978 979
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
980 981

        return history_list
Aurel's avatar
Aurel committed
982

983 984 985
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

986 987 988 989 990
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

991 992 993 994 995 996 997 998 999 1000
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1001 1002 1003 1004
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

1005 1006 1007 1008 1009
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1010
            conn.close()
1011 1012
        # Stop polling thread
        self.poll_thread.stop()
1013
    close = __del__
1014 1015

    def sync(self):
1016
        self._waitAnyMessage(False)
1017

1018 1019 1020 1021 1022 1023 1024 1025 1026
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1027
    def setTID(self, value):
1028
        self.local_var.tid = value
1029 1030

    def getTID(self):
1031
        return self.local_var.tid
1032 1033

    def setTransactionFinished(self):
1034
        self.local_var.txn_finished = True
1035 1036

    def isTransactionFinished(self):
1037
        return self.local_var.txn_finished
1038 1039

    def setTransactionVoted(self):
1040
        self.local_var.txn_voted = True
1041 1042

    def isTransactionVoted(self):
1043
        return self.local_var.txn_voted
1044