app.py 45.5 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
23
import time
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27

28 29 30 31
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
32
from neo.protocol import NodeTypes, Packets, INVALID_PARTITION
33
from neo.event import EventManager
34
from neo.util import makeChecksum as real_makeChecksum, dump
35
from neo.locking import Lock
Vincent Pelletier's avatar
Vincent Pelletier committed
36
from neo.connection import MTClientConnection, OnTimeout
37
from neo.node import NodeManager
38
from neo.connector import getConnectorHandler
39
from neo.client.exception import NEOStorageError
40
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
41 42
from neo.exception import NeoException
from neo.client.handlers import storage, master
43
from neo.dispatcher import Dispatcher, ForgottenPacket
44
from neo.client.poll import ThreadedPoll
45 46
from neo.client.iterator import Iterator
from neo.client.mq import MQ
47
from neo.client.pool import ConnectionPool
48
from neo.util import u64, parseMasterList
49
from neo.profiling import profiler_decorator, PROFILING_ENABLED
50
from neo.live_debug import register as registerLiveDebugger
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
66

67 68
class ThreadContext(object):

69
    def __init__(self):
70
        super(ThreadContext, self).__setattr__('_threads_dict', {})
71

72
    def __getThreadData(self):
73
        thread_id = get_ident()
74
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
75
            result = self._threads_dict[thread_id]
76
        except KeyError:
77 78
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
79 80 81 82 83 84 85 86
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
87

88 89 90 91
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

92 93 94
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
95 96 97 98 99
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
100
        self._threads_dict[thread_id] = {
101 102 103
            'tid': None,
            'txn': None,
            'data_dict': {},
104 105 106
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
107
            'resolved_conflict_serial_dict': {},
108 109 110
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
111
            'queue': queue,
112 113 114 115 116
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
117 118
            'undo_conflict_oid_list': [],
            'undo_error_oid_list': [],
119
            'involved_nodes': set(),
120 121 122
        }


Aurel's avatar
Aurel committed
123
class Application(object):
124 125
    """The client node application."""

126
    def __init__(self, master_nodes, name, connector=None, compress=True, **kw):
127
        # Start polling thread
128 129
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
130
        # Internal Attributes common to all thread
131
        self._db = None
Aurel's avatar
Aurel committed
132
        self.name = name
133
        self.connector_handler = getConnectorHandler(connector)
134
        self.dispatcher = Dispatcher()
135
        self.nm = NodeManager()
136
        self.cp = ConnectionPool(self)
137
        self.pt = None
138
        self.master_conn = None
139
        self.primary_master_node = None
140
        self.trying_master_node = None
141 142

        # load master node list
143
        for address in parseMasterList(master_nodes):
144
            self.nm.createMaster(address=address)
145

146
        # no self-assigned UUID, primary master will supply us one
147
        self.uuid = None
148
        self.mq_cache = MQ()
149
        self.new_oid_list = []
150
        self.last_oid = '\0' * 8
151
        self.storage_event_handler = storage.StorageEventHandler(self)
152
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
153
        self.storage_handler = storage.StorageAnswersHandler(self)
154 155
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
156
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
157
        # Internal attribute distinct between thread
158
        self.local_var = ThreadContext()
159
        # Lock definition :
160
        # _load_lock is used to make loading and storing atomic
161
        lock = Lock()
162 163
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
164 165
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
166 167
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
168
        self._oid_lock_release = lock.release
169
        lock = Lock()
170
        # _cache_lock is used for the client cache
171 172
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
173
        lock = Lock()
174 175
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
176 177
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
178 179 180 181
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
182
        self.compress = compress
183
        registerLiveDebugger(on_log=self.log)
184

185 186 187 188 189 190
    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()

191
    @profiler_decorator
192 193 194 195 196 197 198 199 200 201 202
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
203 204
            # Guess the handler to use based on the type of node on the
            # connection
205 206 207 208 209 210 211 212 213 214
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
215 216 217 218 219
        conn.lock()
        try:
            handler.dispatch(conn, packet)
        finally:
            conn.unlock()
220

221
    @profiler_decorator
222 223 224 225 226 227 228
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
229 230 231
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
232
        _handlePacket = self._handlePacket
233
        while pending(queue):
234 235 236
            try:
                conn, packet = get(block)
            except Empty:
237
                break
238 239
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
240
                continue
241 242 243 244 245 246
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

247
    @profiler_decorator
248 249 250 251 252 253
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
254
            is_forgotten = isinstance(packet, ForgottenPacket)
255 256 257 258 259
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
260 261 262
                    if is_forgotten:
                        raise ValueError, 'ForgottenPacket for an ' \
                            'explicitely expected packet.'
263 264
                    self._handlePacket(conn, packet, handler=handler)
                    break
265
            elif not is_forgotten and packet is not None:
266
                self._handlePacket(conn, packet)
267

268
    @profiler_decorator
269
    def _askStorage(self, conn, packet):
270
        """ Send a request to a storage node and process it's answer """
271
        msg_id = conn.ask(packet, queue=self.local_var.queue)
272 273
        self._waitMessage(conn, msg_id, self.storage_handler)

274
    @profiler_decorator
275
    def _askPrimary(self, packet):
276
        """ Send a request to the primary master and process it's answer """
277
        conn = self._getMasterConnection()
278
        msg_id = conn.ask(packet, queue=self.local_var.queue)
279 280
        self._waitMessage(conn, msg_id, self.primary_handler)

281
    @profiler_decorator
282 283
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
284
        # acquire the lock to allow only one thread to connect to the primary
285 286 287 288
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
289
                self.new_oid_list = []
290 291 292 293 294
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
295

296
    def _getPartitionTable(self):
297
        """ Return the partition table manager, reconnect the PMN if needed """
298 299 300 301 302
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

303
    @profiler_decorator
304 305
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
306
        pt = self._getPartitionTable()
307 308 309 310
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
311
        pt = self._getPartitionTable()
312
        return pt.getCellListForTID(tid, readable, writable)
313

314
    @profiler_decorator
315
    def _connectToPrimaryNode(self):
316
        logging.debug('connecting to primary master...')
317 318
        ready = False
        nm = self.nm
319
        queue = self.local_var.queue
320 321 322 323 324 325 326 327
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
328
                    self.primary_master_node = None
329 330
                else:
                    # Otherwise, check one by one.
331
                    master_list = nm.getMasterList()
332 333 334
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
335
                        time.sleep(1)
336 337 338 339
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
340
                conn = MTClientConnection(self.em,
341
                        self.notifications_handler,
342
                        addr=self.trying_master_node.getAddress(),
343
                        connector=self.connector_handler(),
344
                        dispatcher=self.dispatcher)
345
                # Query for primary master node
346 347 348 349 350
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
                    logging.error('Connection to master node %s failed',
                                  self.trying_master_node)
                    continue
351
                msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
352
                try:
353
                    self._waitMessage(conn, msg_id,
354
                            handler=self.primary_bootstrap_handler)
355 356
                except ConnectionClosed:
                    continue
357
                # If we reached the primary master node, mark as connected
358 359
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
360

361
            logging.info('connected to a primary master node')
362
            # Identify to primary master and request initial data
363
            while conn.getUUID() is None:
364 365 366 367 368 369 370
                if conn.getConnector() is None:
                    logging.error('Connection to master node %s lost',
                                  self.trying_master_node)
                    self.primary_master_node = None
                    break
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
                        self.uuid, None, self.name)
371
                msg_id = conn.ask(p, queue=queue)
372
                try:
373
                    self._waitMessage(conn, msg_id,
374
                            handler=self.primary_bootstrap_handler)
375 376 377
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
378
                if conn.getUUID() is None:
379
                    # Node identification was refused by master.
380
                    time.sleep(1)
381
            if self.uuid is not None:
382
                msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
383
                self._waitMessage(conn, msg_id,
384
                        handler=self.primary_bootstrap_handler)
385
                msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
386
                self._waitMessage(conn, msg_id,
387
                        handler=self.primary_bootstrap_handler)
388
            ready = self.uuid is not None and self.pt is not None \
389
                                 and self.pt.operational()
390
        logging.info("connected to primary master node %s" %
391
                self.primary_master_node)
392
        return conn
393

394 395 396
    def registerDB(self, db, limit):
        self._db = db

397 398 399
    def getDB(self):
        return self._db

400
    @profiler_decorator
401 402 403 404 405
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
406 407 408 409
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
410
                self._askPrimary(Packets.AskNewOIDs(100))
411 412
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
413
            self.last_oid = self.new_oid_list.pop(0)
414
            return self.last_oid
415 416 417
        finally:
            self._oid_lock_release()

418 419 420
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
421

422
    @profiler_decorator
Aurel's avatar
Aurel committed
423 424 425 426
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
427 428
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
429 430 431
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
432
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
433 434
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
435 436 437 438
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
439

440
    @profiler_decorator
441
    def _load(self, oid, serial=None, tid=None, cache=0):
Vincent Pelletier's avatar
Vincent Pelletier committed
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
        oid
            OID of object to get.
        serial
            If given, the exact serial at which OID is desired.
            tid should be None.
        tid
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.
        cache
            Store data in cache for future lookups.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
467
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
468 469 470 471
                object doesn't exist
        """
        # TODO:
        # - rename parameters (here and in handlers & packet definitions)
472
        cell_list = self._getCellListForOID(oid, readable=True)
473 474
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
475 476
            raise NEOStorageError('No storage available for oid %s' % (
                dump(oid), ))
477 478

        shuffle(cell_list)
479
        cell_list.sort(key=self.cp.getCellSortKey)
480
        self.local_var.asked_object = 0
481
        packet = Packets.AskObject(oid, serial, tid)
482
        for cell in cell_list:
483 484
            logging.debug('trying to load %s at %s before %s from %s',
                dump(oid), dump(serial), dump(tid), dump(cell.getUUID()))
485
            conn = self.cp.getConnForCell(cell)
486 487
            if conn is None:
                continue
488

489
            try:
490
                self._askStorage(conn, packet)
491
            except ConnectionClosed:
492
                continue
493

494 495 496 497 498 499
            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
500
                              noid, dump(oid), cell.getAddress())
501 502 503 504 505
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
506
                              cell.getAddress(), dump(oid))
507 508 509 510 511 512 513
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
514
            # We didn't got any object from all storage node because of
515
            # connection error
516
            raise NEOStorageError('connection failure')
517

Aurel's avatar
Aurel committed
518
        if self.local_var.asked_object == -1:
519
            raise NEOStorageError('inconsistent data')
520

521
        # Uncompress data
Aurel's avatar
Aurel committed
522
        if compression:
523
            data = decompress(data)
524

Aurel's avatar
Aurel committed
525 526
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
527
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
528
            try:
529
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
530
            finally:
Aurel's avatar
Aurel committed
531
                self._cache_lock_release()
532 533
        if data == '':
            data = None
534
        return data, start_serial, end_serial
535

536

537
    @profiler_decorator
538 539 540
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
541
        self._load_lock_acquire()
542
        try:
543 544 545 546
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
547 548
                    serial, data = self.mq_cache[oid]
                    return data, serial
549 550 551 552
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
553
        finally:
554
            self._load_lock_release()
Aurel's avatar
Aurel committed
555

556

557
    @profiler_decorator
558
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
559
        """Load an object for a given oid and serial."""
560 561
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
562
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
563

564

565
    @profiler_decorator
566
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
567
        """Load an object for a given oid before tid committed."""
568 569
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
570
        return self._load(oid, tid=tid)
Aurel's avatar
Aurel committed
571

572

573
    @profiler_decorator
574 575 576
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
577
        if self.local_var.txn is transaction:
578
            # We already begin the same transaction
579
            raise StorageTransactionError('Duplicate tpc_begin calls')
580 581
        if self.local_var.txn is not None:
            raise NeoException, 'local_var is not clean in tpc_begin'
582 583 584 585 586 587
        # use the given TID or request a new one to the master
        self.local_var.tid = tid
        if tid is None:
            self._askPrimary(Packets.AskBeginTransaction())
            if self.local_var.tid is None:
                raise NEOStorageError('tpc_begin failed')
588
        self.local_var.txn = transaction
589

590
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
591
    def store(self, oid, serial, data, version, transaction):
592
        """Store object."""
593
        if transaction is not self.local_var.txn:
594
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
595
        logging.debug('storing oid %s serial %s',
596
                     dump(oid), dump(serial))
597
        # Find which storage node to use
598
        cell_list = self._getCellListForOID(oid, writable=True)
599
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
600
            raise NEOStorageError
601 602 603
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
604 605 606 607 608 609 610 611
        if self.compress:
            compressed_data = compress(data)
            if len(compressed_data) > len(data):
                compressed_data = data
                compression = 0
            else:
                compression = 1
        else:
612 613
            compressed_data = data
            compression = 0
614
        checksum = makeChecksum(compressed_data)
615
        p = Packets.AskStoreObject(oid, serial, compression,
616
                 checksum, compressed_data, self.local_var.tid)
617
        on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
618 619
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
620
        # Store data on each node
621
        self.local_var.object_stored_counter_dict[oid] = {}
622
        self.local_var.object_serial_dict[oid] = (serial, version)
623
        getConnForCell = self.cp.getConnForCell
624
        queue = self.local_var.queue
625
        add_involved_nodes = self.local_var.involved_nodes.add
626
        for cell in cell_list:
627
            conn = getConnForCell(cell)
628
            if conn is None:
629
                continue
630
            try:
631
                conn.ask(p, on_timeout=on_timeout, queue=queue)
632
                add_involved_nodes(cell.getNode())
633
            except ConnectionClosed:
634
                continue
635

636 637
        self._waitAnyMessage(False)
        return None
638

639
    def onStoreTimeout(self, conn, msg_id, tid, oid):
640 641 642 643
        # NOTE: this method is called from poll thread, don't use
        # local_var !
        # Stop expecting the timed-out store request.
        queue = self.dispatcher.forget(conn, msg_id)
Vincent Pelletier's avatar
Vincent Pelletier committed
644 645
        # Ask the storage if someone locks the object.
        # Shorten timeout to react earlier to an unresponding storage.
646
        conn.ask(Packets.AskHasLock(tid, oid), timeout=5, queue=queue)
Vincent Pelletier's avatar
Vincent Pelletier committed
647 648
        return True

649
    @profiler_decorator
650 651 652 653 654 655 656
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
657 658
        conflict_serial_dict = local_var.conflict_serial_dict
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
659 660 661 662 663 664 665 666
        for oid, conflict_serial_set in conflict_serial_dict.items():
            resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                oid, set())
            conflict_serial = max(conflict_serial_set)
            if resolved_serial_set and conflict_serial <= max(resolved_serial_set):
                # A later serial has already been resolved, skip.
                resolved_serial_set.update(conflict_serial_dict.pop(oid))
                continue
667 668 669 670 671 672 673 674
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
675 676
                    logging.info('Conflict resolution succeed for %r:%r with %r',
                        dump(oid), dump(serial), dump(conflict_serial))
677
                    # Mark this conflict as resolved
678
                    resolved_serial_set.update(conflict_serial_dict.pop(oid))
679 680
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
Vincent Pelletier's avatar
Vincent Pelletier committed
681
                        local_var.txn)
682 683
                    append(oid)
                    resolved = True
684 685 686 687 688
                else:
                    logging.info('Conflict resolution failed for %r:%r with %r',
                        dump(oid), dump(serial), dump(conflict_serial))
            else:
                logging.info('Conflict reported for %r:%r with later ' \
Grégory Wisniewski's avatar
Grégory Wisniewski committed
689 690
                    'transaction %r , cannot resolve conflict.', dump(oid),
                    dump(serial), dump(conflict_serial))
691 692 693 694 695 696
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
697

698 699 700 701 702 703 704 705 706 707
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

708
    @profiler_decorator
709 710 711 712 713 714 715 716 717
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
        while True:
718
            self.waitResponses()
719 720 721 722 723 724 725
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
726

Vincent Pelletier's avatar
Vincent Pelletier committed
727
        # Check for never-stored objects, and update result for all others
728
        for oid, store_dict in \
Vincent Pelletier's avatar
Vincent Pelletier committed
729
            local_var.object_stored_counter_dict.iteritems():
730
            if not store_dict:
Vincent Pelletier's avatar
Vincent Pelletier committed
731 732 733 734 735
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
            else:
                append((oid, tid))
736
        return result
Aurel's avatar
Aurel committed
737

738
    @profiler_decorator
739
    def tpc_vote(self, transaction, tryToResolveConflict):
740
        """Store current transaction."""
741 742
        local_var = self.local_var
        if transaction is not local_var.txn:
743
            raise StorageTransactionError(self, transaction)
744 745 746

        result = self.waitStoreResponses(tryToResolveConflict)

747
        tid = local_var.tid
748
        # Store data on each node
749
        voted_counter = 0
750 751
        p = Packets.AskStoreTransaction(tid, str(transaction.user),
            str(transaction.description), dumps(transaction._extension),
752
            local_var.data_dict.keys())
753
        add_involved_nodes = self.local_var.involved_nodes.add
754
        for cell in self._getCellListForTID(tid, writable=True):
755 756
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
757
            conn = self.cp.getConnForCell(cell)
758 759
            if conn is None:
                continue
760

761
            local_var.txn_voted = False
762 763
            try:
                self._askStorage(conn, p)
764
                add_involved_nodes(cell.getNode())
765
            except ConnectionClosed:
766
                continue
767

768
            if not self.isTransactionVoted():
769
                raise NEOStorageError('tpc_vote failed')
770
            voted_counter += 1
771 772

        # check at least one storage node accepted
773
        if voted_counter == 0:
774
            raise NEOStorageError('tpc_vote failed')
775 776 777 778 779
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
780

781 782
        return result

783
    @profiler_decorator
784 785
    def tpc_abort(self, transaction):
        """Abort current transaction."""
786
        if transaction is not self.local_var.txn:
787
            return
Aurel's avatar
Aurel committed
788

789 790
        tid = self.local_var.tid
        p = Packets.AbortTransaction(tid)
791
        getConnForNode = self.cp.getConnForNode
792
        # cancel transaction one all those nodes
793
        for node in self.local_var.involved_nodes:
794
            conn = getConnForNode(node)
795 796
            if conn is None:
                continue
797 798 799 800 801
            try:
                conn.notify(p)
            except:
                logging.error('Exception in tpc_abort while notifying ' \
                    'storage node %r of abortion, ignoring.', conn, exc_info=1)
802

803 804 805 806 807 808 809 810 811 812
        # Just wait for responses to arrive. If any leads to an exception,
        # log it and continue: we *must* eat all answers to not disturb the
        # next transaction.
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            try:
                _waitAnyMessage()
            except:
813 814
                logging.error('Exception in tpc_abort while handling ' \
                    'pending answers, ignoring.', exc_info=1)
815

816
        self.local_var.clear()
817

818
    @profiler_decorator
819 820
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
821
        if self.local_var.txn is not transaction:
822 823
            raise StorageTransactionError('tpc_finish called for wrong'
                'transaction')
824
        self._load_lock_acquire()
825
        try:
826
            tid = self.local_var.tid
827 828
            # Call function given by ZODB
            if f is not None:
829
                f(tid)
830 831

            # Call finish on master
832
            oid_list = self.local_var.data_dict.keys()
833
            p = Packets.AskFinishTransaction(tid, oid_list)
834
            self._askPrimary(p)
835

836
            if not self.isTransactionFinished():
837
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
838

839 840 841
            # Update cache
            self._cache_lock_acquire()
            try:
842
                mq_cache = self.mq_cache
843
                for oid, data in self.local_var.data_dict.iteritems():
844
                    if data == '':
845 846
                        if oid in mq_cache:
                            del mq_cache[oid]
847 848
                    else:
                        # Now serial is same as tid
849
                        mq_cache[oid] = tid, data
850 851
            finally:
                self._cache_lock_release()
852
            self.local_var.clear()
853
            return tid
854
        finally:
855
            self._load_lock_release()
856

857
    def undo(self, undone_tid, txn, tryToResolveConflict):
858 859 860 861 862 863 864 865 866
        # FIXME: undo must be refactored to work with replication:
        # - load oid list from a readable storage
        #   (excludes replicating nodes)
        # - get each object's data backpointer from readable storage nodes
        #   (excludes replicating nodes)
        # - optionally, resolve conflicts
        # - store object's backpointers in all writable storage nodes, or
        #   store conflict resolution data
        #   (includes replicating nodes)
867
        if txn is not self.local_var.txn:
868
            raise StorageTransactionError(self, undone_tid)
869

870
        # First get transaction information from a storage node.
871
        cell_list = self._getCellListForTID(undone_tid, readable=True)
872
        assert len(cell_list), 'No cell found for transaction %s' % (
873
            dump(undone_tid), )
874
        shuffle(cell_list)
875
        cell_list.sort(key=self.cp.getCellSortKey)
876
        packet = Packets.AskTransactionInformation(undone_tid)
877
        for cell in cell_list:
878
            conn = self.cp.getConnForCell(cell)
879 880
            if conn is None:
                continue
881

882
            self.local_var.txn_info = 0
883
            self.local_var.txn_ext = 0
884
            try:
885
                self._askStorage(conn, packet)
886
            except ConnectionClosed:
887
                continue
888
            except NEOStorageNotFoundError:
889
                # Tid not found, try with next node
890
                logging.warning('Transaction %s was not found on node %s',
891
                    dump(undone_tid), self.nm.getByAddress(conn.getAddress()))
892
                continue
893 894

            if isinstance(self.local_var.txn_info, dict):
895 896 897
                break
            else:
                raise NEOStorageError('undo failed')
898
        else:
899 900
            raise NEOStorageError('undo failed')

901 902 903 904 905 906
        tid = self.local_var.tid

        undo_conflict_oid_list = self.local_var.undo_conflict_oid_list = []
        undo_error_oid_list = self.local_var.undo_error_oid_list = []
        ask_undo_transaction = Packets.AskUndoTransaction(tid, undone_tid)
        getConnForNode = self.cp.getConnForNode
907
        queue = self.local_var.queue
908 909
        for storage_node in self.nm.getStorageList():
            storage_conn = getConnForNode(storage_node)
910
            storage_conn.ask(ask_undo_transaction, queue=queue)
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932
        # Wait for all AnswerUndoTransaction.
        self.waitResponses()

        # Don't do any handling for "live" conflicts, raise
        if undo_conflict_oid_list:
            raise ConflictError(oid=undo_conflict_oid_list[0], serials=(tid,
                undone_tid), data=None)

        # Try to resolve undo conflicts
        for oid in undo_error_oid_list:
            def loadBefore(oid, tid):
                try:
                    result = self._load(oid, tid=tid)
                except NEOStorageNotFoundError:
                    raise UndoError("Object not found while resolving undo " \
                        "conflict")
                return result[:2]
            # Load the latest version we are supposed to see
            data, data_tid = loadBefore(oid, tid)
            # Load the version we were undoing to
            undo_data, _ = loadBefore(oid, undone_tid)
            # Resolve conflict
933 934 935 936 937
            try:
                new_data = tryToResolveConflict(oid, data_tid, undone_tid,
                    undo_data, data)
            except ConflictError:
                new_data = None
938 939 940 941 942 943
            if new_data is None:
                raise UndoError('Some data were modified by a later ' \
                    'transaction', oid)
            else:
                self.store(oid, data_tid, new_data, '', self.local_var.txn)

944
        oid_list = self.local_var.txn_info['oids']
945 946 947
        # Consistency checking: all oids of the transaction must have been
        # reported as undone
        data_dict = self.local_var.data_dict
Aurel's avatar
Aurel committed
948
        for oid in oid_list:
949
            assert oid in data_dict, repr(oid)
950
        return self.local_var.tid, oid_list
951

952 953 954 955
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

956
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
957 958 959 960
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

961
        # First get a list of transactions from all storage nodes.
962 963
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
964
        pt = self._getPartitionTable()
965
        storage_node_list = pt.getNodeList()
966

967
        self.local_var.node_tids = {}
968
        queue = self.local_var.queue
969
        for storage_node in storage_node_list:
970
            conn = self.cp.getConnForNode(storage_node)
971 972
            if conn is None:
                continue
973
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION), queue=queue)
974 975

        # Wait for answers from all storages.
976
        while len(self.local_var.node_tids) != len(storage_node_list):
977
            self._waitAnyMessage()
978 979

        # Reorder tids
980 981 982
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
983
            update(tid_list)
984
        ordered_tids = list(ordered_tids)
985
        ordered_tids.sort(reverse=True)
986
        logging.debug("UndoLog, tids %s", ordered_tids)
987 988
        # For each transaction, get info
        undo_info = []
989
        append = undo_info.append
990
        for tid in ordered_tids:
991
            cell_list = self._getCellListForTID(tid, readable=True)
992
            shuffle(cell_list)
993
            cell_list.sort(key=self.cp.getCellSortKey)
994
            for cell in cell_list:
995
                conn = self.cp.getConnForCell(cell)
996 997
                if conn is not None:
                    self.local_var.txn_info = 0
998
                    self.local_var.txn_ext = 0
999
                    try:
1000
                        self._askStorage(conn,
1001
                                Packets.AskTransactionInformation(tid))
1002
                    except ConnectionClosed:
1003 1004 1005
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
1006

1007
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
1008
                # TID not found at all
1009 1010 1011
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
1012

1013
            if filter is None or filter(self.local_var.txn_info):
1014
                self.local_var.txn_info.pop('packed')
1015 1016
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
1017
                append(self.local_var.txn_info)
1018 1019
                self._insertMetadata(self.local_var.txn_info,
                        self.local_var.txn_ext)
1020 1021
                if len(undo_info) >= last - first:
                    break
1022 1023 1024
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
1025 1026
            undo_info = self.__undoLog(first=first, last=last*5, filter=filter,
                    block=1, with_oids=with_oids)
1027 1028
        return undo_info

1029
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1030
        return self.__undoLog(first, last, filter, block)
1031 1032

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1033
        return self.__undoLog(first, last, with_oids=True)
1034

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1035
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
1036
        # Get history informations for object first
1037
        cell_list = self._getCellListForOID(oid, readable=True)
1038
        shuffle(cell_list)
1039
        cell_list.sort(key=self.cp.getCellSortKey)
1040
        for cell in cell_list:
1041 1042
            # FIXME: we keep overwriting self.local_var.history here, we
            # should aggregate it instead.
1043
            conn = self.cp.getConnForCell(cell)
1044 1045
            if conn is None:
                continue
1046

1047
            self.local_var.history = None
1048
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1049
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
1050
            except ConnectionClosed:
1051
                continue
1052

1053 1054
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
1055 1056 1057
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
1058

1059
        if not isinstance(self.local_var.history, tuple):
1060 1061
            raise NEOStorageError('history failed')

1062 1063
        if self.local_var.history[1] == [] or \
            self.local_var.history[1][0][1] == 0:
1064 1065 1066 1067
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

1068 1069 1070 1071
        if object_only:
            # Use by getSerial
            return self.local_var.history

1072
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
1073
        history_list = []
1074
        for serial, size in self.local_var.history[1]:
1075
            self._getCellListForTID(serial, readable=True)
1076
            shuffle(cell_list)
1077
            cell_list.sort(key=self.cp.getCellSortKey)
1078
            for cell in cell_list:
1079
                conn = self.cp.getConnForCell(cell)
1080 1081
                if conn is None:
                    continue
1082

1083 1084
                # ask transaction information
                self.local_var.txn_info = None
1085
                try:
1086
                    self._askStorage(conn,
1087
                            Packets.AskTransactionInformation(serial))
1088
                except ConnectionClosed:
1089
                    continue
1090
                except NEOStorageNotFoundError:
1091
                    # TID not found
Aurel's avatar
Aurel committed
1092
                    continue
1093
                if isinstance(self.local_var.txn_info, dict):
1094 1095 1096
                    break

            # create history dict
1097 1098
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1099
            self.local_var.txn_info.pop('packed')
1100
            self.local_var.txn_info['tid'] = serial
1101
            self.local_var.txn_info['version'] = ''
1102
            self.local_var.txn_info['size'] = size
1103 1104
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1105 1106
            self._insertMetadata(self.local_var.txn_info,
                    self.local_var.txn_ext)
1107 1108

        return history_list
Aurel's avatar
Aurel committed
1109

1110
    @profiler_decorator
1111
    def importFrom(self, source, start, stop, tryToResolveConflict):
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
        serials = {}
        def updateLastSerial(oid, result):
            if result:
                if isinstance(result, str):
                    assert oid is not None
                    serials[oid] = result
                else:
                    for oid, serial in result:
                        assert isinstance(serial, str), serial
                        serials[oid] = serial
1122
        transaction_iter = source.iterator(start, stop)
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
        for transaction in transaction_iter:
            self.tpc_begin(transaction, transaction.tid, transaction.status)
            for r in transaction:
                pre = serials.get(r.oid, None)
                # TODO: bypass conflict resolution, locks...
                result = self.store(r.oid, pre, r.data, r.version, transaction)
                updateLastSerial(r.oid, result)
            updateLastSerial(None, self.tpc_vote(transaction,
                        tryToResolveConflict))
            self.tpc_finish(transaction)
        transaction_iter.close()

1135 1136 1137
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1138 1139 1140 1141 1142
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1153 1154 1155 1156
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

1157 1158 1159 1160 1161
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1162
            conn.close()
1163 1164
        # Stop polling thread
        self.poll_thread.stop()
1165
    close = __del__
1166 1167

    def sync(self):
1168
        self._waitAnyMessage(False)
1169

1170 1171 1172 1173 1174 1175 1176 1177 1178
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1179
    def setTID(self, value):
1180
        self.local_var.tid = value
1181 1182

    def getTID(self):
1183
        return self.local_var.tid
1184 1185

    def setTransactionFinished(self):
1186
        self.local_var.txn_finished = True
1187 1188

    def isTransactionFinished(self):
1189
        return self.local_var.txn_finished
1190 1191

    def setTransactionVoted(self):
1192
        self.local_var.txn_voted = True
1193 1194

    def isTransactionVoted(self):
1195
        return self.local_var.txn_voted
1196