app.py 35.5 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18 19
import logging
import os
20
from threading import local
21
from cPickle import dumps, loads
22
from zlib import compress, decompress
23
from Queue import Queue, Empty
24
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
25
from time import sleep
26 27

from neo.client.mq import MQ
28
from neo.node import NodeManager, MasterNode, StorageNode
29
from neo.connection import MTClientConnection
30
from neo import protocol
31
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
32
        INVALID_PTID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE, INVALID_SERIAL
33
from neo.client.handler import *
34
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
35
     NEOStorageNotFoundError
36
from neo.util import makeChecksum, dump
37
from neo.connector import getConnectorHandler
38 39 40
from neo.client.dispatcher import Dispatcher
from neo.client.poll import ThreadedPoll
from neo.event import EventManager
41
from neo.locking import RLock, Lock
42

43
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
44
from ZODB.utils import p64, u64, oid_repr
Aurel's avatar
Aurel committed
45

46 47
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
48

49
    def __init__(self, app, max_pool_size = 25):
50
        self.app = app
51
        self.max_pool_size = max_pool_size
52
        self.connection_dict = {}
53 54 55
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
56
        l = RLock()
57 58
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
59

60
    def _initNodeConnection(self, cell):
Aurel's avatar
Aurel committed
61
        """Init a connection to a given storage node."""
62 63
        node = cell.getNode()
        addr = node.getServer()
64 65
        if addr is None:
            return None
66

67
        if cell.getState() != UP_TO_DATE_STATE:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
68 69 70 71 72
            return None

        app = self.app

        # Loop until a connection is obtained.
73
        while True:
74
            logging.info('trying to connect to %s', node)
75
            app.setNodeReady()
76 77
            handler = StorageBootstrapEventHandler(app, app.dispatcher)
            conn = MTClientConnection(app.em, handler, addr,
78
                                      connector_handler=app.connector_handler)
79 80 81 82
            conn.lock()
            try:
                if conn.getConnector() is None:
                    # This happens, if a connection could not be established.
83
                    logging.error('Connection to storage node %s failed', node)
84
                    return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
85

86 87 88
                p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
                            app.uuid, addr[0], addr[1], app.name)
                msg_id = conn.ask(p)
89 90 91
                app.dispatcher.register(conn, msg_id, app.getQueue())
            finally:
                conn.unlock()
92

Yoshinori Okuji's avatar
Yoshinori Okuji committed
93
            try:
94
                app._waitMessage(conn, msg_id, handler=handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
95
            except NEOStorageError:
96
                logging.error('Connection to storage node %s failed', node)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
97 98
                return None

99 100
            if app.isNodeReady():
                logging.info('connected to storage node %s', node)
101
                conn.setHandler(self.app.storage_handler)
102 103
                return conn
            else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
104
                # Connection failed, notify primary master node
105
                logging.info('Storage node %s not ready', node)
Aurel's avatar
Aurel committed
106
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
107

108 109
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
110 111
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
112 113
            conn.lock()
            try:
114
                if not conn.pending() and \
115
                        not self.app.dispatcher.registered(conn):
116
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
117
                    conn.close()
Aurel's avatar
Aurel committed
118
                    logging.info('_dropConnections : connection to storage node %s:%d closed', 
119 120 121
                                 *(conn.getAddress()))
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
122 123
            finally:
                conn.unlock()
124 125 126

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
127
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
128
            # must drop some unused connections
129
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
130 131 132 133 134 135 136

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
137 138
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
139

Aurel's avatar
Aurel committed
140 141 142 143 144 145 146 147 148 149 150
        # add node to node manager
        if self.app.nm.getNodeByServer(node.getServer()) is None:
            n = StorageNode(node.getServer())
            self.app.nm.add(n)
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
151
        uuid = node.getUUID()
152 153
        self.connection_lock_acquire()
        try:
154 155
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
156 157 158
                # Already connected to node
                conn.lock()
                return conn
159
            except KeyError:
Aurel's avatar
Aurel committed
160 161
                # Create new connection to node
                return self._createNodeConnection(node)
162 163 164
        finally:
            self.connection_lock_release()

165 166
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
167 168
        self.connection_lock_acquire()
        try:
169 170 171 172
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
173 174
        finally:
            self.connection_lock_release()
175

176

Aurel's avatar
Aurel committed
177
class Application(object):
178 179
    """The client node application."""

180
    def __init__(self, master_nodes, name, connector, **kw):
181
        logging.basicConfig(level = logging.DEBUG)
182
        logging.debug('master node address are %s' %(master_nodes,))
183 184 185
        em = EventManager()
        # Start polling thread
        self.poll_thread = ThreadedPoll(em)
186
        # Internal Attributes common to all thread
Aurel's avatar
Aurel committed
187
        self.name = name
188
        self.em = em
189
        self.connector_handler = getConnectorHandler(connector)
190
        self.dispatcher = Dispatcher()
191
        self.nm = NodeManager()
192
        self.cp = ConnectionPool(self)
193 194
        self.pt = None
        self.primary_master_node = None
195
        self.master_node_list = master_nodes.split(' ')
196
        self.master_conn = None
197 198
        # no self-assigned UUID, primary master will supply us one
        self.uuid = INVALID_UUID
199
        self.mq_cache = MQ()
200
        self.new_oid_list = []
201
        self.ptid = INVALID_PTID
202 203
        self.num_replicas = 0
        self.num_partitions = 0
204 205
        self.primary_handler = PrimaryEventHandler(self, self.dispatcher)
        self.storage_handler = StorageEventHandler(self, self.dispatcher)
206 207
        # Internal attribute distinct between thread
        self.local_var = local()
208 209 210 211 212 213 214
        self.local_var.txn = None
        # Transaction specific variable
        self.local_var.data_dict = {}
        self.local_var.object_stored = 0
        self.local_var.txn_voted = False
        self.local_var.txn_finished = False
        self.local_var.tid = None
215
        # Lock definition :
216
        # _load_lock is used to make loading and storing atomic
217
        lock = Lock()
218 219
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
220 221
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
222 223
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
224
        self._oid_lock_release = lock.release
225
        lock = Lock()
226
        # _cache_lock is used for the client cache
227 228
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
229
        lock = Lock()
230 231
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
232 233
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
234 235 236 237 238 239 240 241
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
        # __pt ensure exclusive access to the partition table
        lock = Lock()
        self._pt_acquire = lock.acquire
        self._pt_release = lock.release
242
        # Connect to master node
243
        self.connectToPrimaryMasterNode()
244 245
        if self.uuid == INVALID_UUID:
            raise NEOStorageError('No UUID given from the primary master')
246

247
    def getQueue(self):
248 249 250 251 252
        try:
            return self.local_var.queue
        except AttributeError:
            self.local_var.queue = Queue(5)
            return self.local_var.queue
253

254
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
255 256 257
        """Wait for a message returned by the dispatcher in queues."""
        local_queue = self.getQueue()

258
        while 1:
259 260
            try:
                if msg_id is None:
261
                    conn, packet = local_queue.get_nowait()
262 263 264 265
                else:
                    conn, packet = local_queue.get()
            except Empty:
                break
Yoshinori Okuji's avatar
Yoshinori Okuji committed
266 267 268 269 270
            if packet is None:
                if conn is target_conn:
                    raise NEOStorageError('connection closed')
                else:
                    continue
271
            handler.dispatch(conn, packet)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
272 273
            if target_conn is conn and msg_id == packet.getId() \
                    and packet.getType() & 0x8000:
274
                break
275

276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
            msg_id = conn.ask(packet, timeout, additional_timeout)
            self.dispatcher.register(conn, msg_id, self.getQueue())
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
        if self.master_conn is None:
            raise NEOStorageError("Connection to master node failed")
        conn = self.master_conn
        conn.lock()
        try:
            msg_id = conn.ask(packet, timeout, additional_timeout)
            self.dispatcher.register(conn, msg_id, self.getQueue())
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

299 300 301
    def registerDB(self, db, limit):
        self._db = db

302 303 304
    def getDB(self):
        return self._db

305 306 307 308 309
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
310 311 312 313
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
314
                self._askPrimary(protocol.askNewOIDs(25))
315 316
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
Aurel's avatar
Aurel committed
317
            return self.new_oid_list.pop()
318 319 320
        finally:
            self._oid_lock_release()

321

Aurel's avatar
Aurel committed
322 323 324 325
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
326 327
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
328 329 330
        finally:
            self._cache_lock_release()
        # history return serial, so use it
331
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
332 333
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
334 335 336 337
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
338

339
    def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
Aurel's avatar
Aurel committed
340
        """Internal method which manage load ,loadSerial and loadBefore."""
341
        partition_id = u64(oid) % self.num_partitions
342

Yoshinori Okuji's avatar
Yoshinori Okuji committed
343 344
        self.local_var.asked_object = None
        while self.local_var.asked_object is None:
345 346 347 348 349 350
            self._pt_acquire()
            try:
                cell_list = self.pt.getCellList(partition_id, readable=True)
            finally:
                self._pt_release()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
351 352
            if len(cell_list) == 0:
                sleep(1)
353
                continue
354

Yoshinori Okuji's avatar
Yoshinori Okuji committed
355 356 357 358 359 360 361 362
            shuffle(cell_list)
            self.local_var.asked_object = None
            for cell in cell_list:
                logging.debug('trying to load %s from %s',
                              dump(oid), dump(cell.getUUID()))
                conn = self.cp.getConnForNode(cell)
                if conn is None:
                    continue
363

364 365
                self.local_var.asked_object = 0
                self._askStorage(conn, protocol.askObject(oid, serial, tid))
366

Yoshinori Okuji's avatar
Yoshinori Okuji committed
367 368 369 370 371 372
                if self.local_var.asked_object == -1:
                    # OID not found
                    break

                # Check data
                noid, start_serial, end_serial, compression, checksum, data \
373
                    = self.local_var.asked_object
Yoshinori Okuji's avatar
Yoshinori Okuji committed
374 375 376
                if noid != oid:
                    # Oops, try with next node
                    logging.error('got wrong oid %s instead of %s from node %s',
377
                                  noid, dump(oid), cell.getServer())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
378 379 380 381
                    continue
                elif checksum != makeChecksum(data):
                    # Check checksum.
                    logging.error('wrong checksum from node %s for oid %s',
382
                                  cell.getServer(), dump(oid))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
383 384 385 386
                    continue
                else:
                    # Everything looks alright.
                    break
387

Aurel's avatar
Aurel committed
388
        if self.local_var.asked_object == -1:
389
            # We didn't got any object from all storage node
390
            logging.debug('oid %s not found', dump(oid))
391
            raise NEOStorageNotFoundError()
392

393
        # Uncompress data
Aurel's avatar
Aurel committed
394
        if compression:
395
            data = decompress(data)
396

Aurel's avatar
Aurel committed
397 398
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
399
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
400
            try:
401
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
402
            finally:
Aurel's avatar
Aurel committed
403
                self._cache_lock_release()
Aurel's avatar
Aurel committed
404 405
        if end_serial == INVALID_SERIAL:
            end_serial = None
406
        return data, start_serial, end_serial
407

408

409 410 411
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
412
        self._load_lock_acquire()
413
        try:
414 415 416 417
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
418
                    return self.mq_cache[oid][1], self.mq_cache[oid][0]
419 420 421 422
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
423
        finally:
424
            self._load_lock_release()
Aurel's avatar
Aurel committed
425

426

427
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
428
        """Load an object for a given oid and serial."""
429 430
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
431
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
432

433

434
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
435
        """Load an object for a given oid before tid committed."""
436
        # Do not try in cache as it manages only up-to-date object
437 438
        if tid is None:
            tid = INVALID_TID
439
        logging.debug('loading %s before %s', dump(oid), dump(tid))
440
        data, start, end = self._load(oid, tid=tid)
441 442 443 444 445
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
446

447

448 449 450
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
451
        if self.local_var.txn is transaction:
452
            # We already begin the same transaction
453 454 455
            return
        # Get a new transaction id if necessary
        if tid is None:
456
            self.local_var.tid = None
457
            self._askPrimary(protocol.askNewTID())
458
            if self.local_var.tid is None:
459
                raise NEOStorageError('tpc_begin failed')
460
        else:
461 462
            self.local_var.tid = tid
        self.local_var.txn = transaction            
463

464

465 466
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
467
        if transaction is not self.local_var.txn:
468
            raise StorageTransactionError(self, transaction)
469 470
        if serial is None:
            serial = INVALID_SERIAL
Aurel's avatar
Aurel committed
471
        logging.debug('storing oid %s serial %s',
472
                     dump(oid), dump(serial))
473
        # Find which storage node to use
474
        partition_id = u64(oid) % self.num_partitions
475 476 477 478 479
        self._pt_acquire()
        try:
            cell_list = self.pt.getCellList(partition_id, writable=True)
        finally:
            self._pt_release()
480
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
481 482
            # FIXME must wait for cluster to be ready
            raise NEOStorageError
483
        # Store data on each node
484
        compressed_data = compress(data)
485
        checksum = makeChecksum(compressed_data)
486
        for cell in cell_list:
487
            #logging.info("storing object %s %s" %(cell.getServer(),cell.getState()))
488
            conn = self.cp.getConnForNode(cell)
489 490
            if conn is None:
                continue
491

492 493 494 495
            self.local_var.object_stored = 0
            p = protocol.askStoreObject(oid, serial, 1,
                     checksum, compressed_data, self.local_var.tid)
            self._askStorage(conn, p)
496 497

            # Check we don't get any conflict
498 499
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
500 501 502 503
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
504 505
                    del self.local_var.data_dict[oid]
                self.conflict_serial = self.local_var.object_stored[1]
506
                raise NEOStorageConflictError
507 508

        # Store object in tmp cache
509 510
        noid, nserial = self.local_var.object_stored
        self.local_var.data_dict[oid] = data
511

512
        return self.local_var.tid
513

Aurel's avatar
Aurel committed
514

515 516
    def tpc_vote(self, transaction):
        """Store current transaction."""
517
        if transaction is not self.local_var.txn:
518
            raise StorageTransactionError(self, transaction)
519 520
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
521
        ext = dumps(transaction._extension)
522
        oid_list = self.local_var.data_dict.keys()
523
        # Store data on each node
524 525 526 527 528 529
        partition_id = u64(self.local_var.tid) % self.num_partitions
        self._pt_acquire()
        try:
            cell_list = self.pt.getCellList(partition_id, writable=True)
        finally:
            self._pt_release()
530
        for cell in cell_list:
Aurel's avatar
Aurel committed
531
            logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
532
            conn = self.cp.getConnForNode(cell)
533 534
            if conn is None:
                continue
535

536 537 538 539
            self.local_var.txn_voted = False
            p = protocol.askStoreTransaction(self.local_var.tid, 
                    user, desc, ext, oid_list)
            self._askStorage(conn, p)
540

541
            if not self.isTransactionVoted():
542 543
                raise NEOStorageError('tpc_vote failed')

544
    def _clear_txn(self):
545
        """Clear some transaction parameters."""
546 547 548 549 550
        self.local_var.tid = None
        self.local_var.txn = None
        self.local_var.data_dict.clear()
        self.local_var.txn_voted = False
        self.local_var.txn_finished = False
551

552 553
    def tpc_abort(self, transaction):
        """Abort current transaction."""
554
        if transaction is not self.local_var.txn:
555
            return
Aurel's avatar
Aurel committed
556

557 558
        cell_set = set()

559 560 561 562 563 564
        self._pt_acquire()
        try:
            # select nodes where objects were stored
            for oid in self.local_var.data_dict.iterkeys():
                partition_id = u64(oid) % self.num_partitions
                cell_set |= set(self.pt.getCellList(partition_id, writable=True))
Aurel's avatar
Aurel committed
565

566 567 568 569 570
            # select nodes where transaction was stored
            partition_id = u64(self.local_var.tid) % self.num_partitions
            cell_set |= set(self.pt.getCellList(partition_id, writable=True))
        finally:
            self._pt_release()
Aurel's avatar
Aurel committed
571

572 573 574 575 576 577
        # cancel transaction one all those nodes
        for cell in cell_set:
            conn = self.cp.getConnForNode(cell)
            if conn is None:
                continue
            try:
578
                conn.notify(protocol.abortTransaction(self.local_var.tid))
579 580
            finally:
                conn.unlock()
581

582 583 584 585
        # Abort the transaction in the primary master node.
        conn = self.master_conn
        conn.lock()
        try:
586
            conn.notify(protocol.abortTransaction(self.local_var.tid))
587 588 589
        finally:
            conn.unlock()

590
        self._clear_txn()
591

592 593
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
594
        if self.local_var.txn is not transaction:
595
            return
596
        self._load_lock_acquire()
597
        try:
598 599
            # Call function given by ZODB
            if f is not None:
600
                f(self.local_var.tid)
601 602

            # Call finish on master
603
            oid_list = self.local_var.data_dict.keys()
604 605
            p = protocol.finishTransaction(oid_list, self.local_var.tid)
            self._askPrimary(p)
606

607
            if not self.isTransactionFinished():
608
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
609

610 611 612
            # Update cache
            self._cache_lock_acquire()
            try:
613 614
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
615
                    # Now serial is same as tid
616
                    self.mq_cache[oid] = self.local_var.tid, data
617 618 619
            finally:
                self._cache_lock_release()
            self._clear_txn()
620
            return self.local_var.tid
621
        finally:
622
            self._load_lock_release()
623

Aurel's avatar
Aurel committed
624
    def undo(self, transaction_id, txn, wrapper):
625
        if txn is not self.local_var.txn:
626
            raise StorageTransactionError(self, transaction_id)
627

628
        # First get transaction information from a storage node.
629
        partition_id = u64(transaction_id) % self.num_partitions
630 631 632 633 634
        self._pt_acquire()
        try:
            cell_list = self.pt.getCellList(partition_id, writable=True)
        finally:
            self._pt_release()
635 636 637
        shuffle(cell_list)
        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
638 639
            if conn is None:
                continue
640

641 642
            self.local_var.txn_info = 0
            self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
643

644 645 646
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
647
            elif isinstance(self.local_var.txn_info, dict):
648 649 650 651 652 653 654 655
                break
            else:
                raise NEOStorageError('undo failed')

        if self.local_var.txn_info == -1:
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
656 657 658
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
659 660 661 662 663
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
664
            data, start, end = result
Aurel's avatar
Aurel committed
665 666 667
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
668
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
669
            data_dict[oid] = data
Aurel's avatar
Aurel committed
670

671
        # Third do transaction with old data
672 673
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
674
            data = data_dict[oid]
Aurel's avatar
Aurel committed
675
            try:
676
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
677
            except NEOStorageConflictError, serial:
678 679
                if serial <= self.local_var.tid:
                    new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid,
680
                                                            serial, data)
Aurel's avatar
Aurel committed
681
                    if new_data is not None:
682
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
683
                        continue
684
                raise ConflictError(oid = oid, serials = (self.local_var.tid, serial),
685
                                    data = data)
686
        return self.local_var.tid, oid_list
687

688
    def undoLog(self, first, last, filter=None, block=0):
689 690 691 692
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

693
        # First get a list of transactions from all storage nodes.
694 695 696 697 698 699 700
        # Each storage node will return TIDs only for UP_TO_DATE_STATE and
        # FEEDING_STATE cells
        self._pt_acquire()
        try:
            storage_node_list = self.pt.getNodeList()
        finally:
            self._pt_release()
701

702 703
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
704
            conn = self.cp.getConnForNode(storage_node)
705 706
            if conn is None:
                continue
707

708
            try:
709 710
                p = protocol.askTIDs(first, last, INVALID_PARTITION)
                msg_id = conn.ask(p)
711
                self.dispatcher.register(conn, msg_id, self.getQueue())
712 713 714 715 716
            finally:
                conn.unlock()

        # Wait for answers from all storages.
        # FIXME this is a busy loop.
717
        while True:
718
            self._waitMessage(handler=self.storage_handler)
Aurel's avatar
Aurel committed
719
            if len(self.local_var.node_tids.keys()) == len(storage_node_list):
720 721 722 723 724
                break

        # Reorder tids
        ordered_tids = []
        for tids in self.local_var.node_tids.values():
Aurel's avatar
Aurel committed
725
            ordered_tids.extend(tids)
726 727
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
Aurel's avatar
Aurel committed
728
        logging.info("UndoLog, tids %s", ordered_tids)
729 730 731
        # For each transaction, get info
        undo_info = []
        for tid in ordered_tids:
732
            partition_id = u64(tid) % self.num_partitions
733 734 735 736 737
            self._pt_acquire()
            try:
                cell_list = self.pt.getCellList(partition_id, readable=True)
            finally:
                self._pt_release()
738 739 740
            shuffle(cell_list)
            for cell in cell_list:
                conn = self.cp.getConnForNode(storage_node)
741 742
                if conn is None:
                    continue
743

744 745
                self.local_var.txn_info = 0
                self._askStorage(conn, protocol.askTransactionInformation(tid))
746

747 748 749
                if self.local_var.txn_info == -1:
                    # TID not found, go on with next node
                    continue
750
                elif isinstance(self.local_var.txn_info, dict):
751
                    break
752

Aurel's avatar
Aurel committed
753 754 755 756
            if self.local_var.txn_info == -1:
                # TID not found at all
                continue

757 758 759
            # Filter result if needed
            if filter is not None:
                # Filter method return True if match
760
                if not filter(self.local_var.txn_info):
761 762 763 764 765
                    continue

            # Append to returned list
            self.local_var.txn_info.pop("oids")
            undo_info.append(self.local_var.txn_info)
766
            if len(undo_info) >= last - first:
767
                break
768 769 770 771
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
            undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1)
772 773
        return undo_info

774
    # FIXME: filter function isn't used 
775
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
776
        # Get history informations for object first
777
        partition_id = u64(oid) % self.num_partitions
778 779 780 781 782
        self._pt_acquire()
        try:
            cell_list = self.pt.getCellList(partition_id, readable=True)
        finally:
            self._pt_release()
783 784 785 786
        shuffle(cell_list)

        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
787 788
            if conn is None:
                continue
789

790 791
            self.local_var.history = None
            self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
792

793 794 795 796 797 798
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
                continue
799

800
        if not isinstance(self.local_var.history, tuple):
801 802 803 804 805 806
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
807
        history_list = []
808
        for serial, size in self.local_var.history[1]:
809
            partition_id = u64(serial) % self.num_partitions
810 811 812 813 814
            self._pt_acquire()
            try:
                cell_list = self.pt.getCellList(partition_id, readable=True)
            finally:
                self._pt_release()
815 816 817 818
            shuffle(cell_list)

            for cell in cell_list:
                conn = self.cp.getConnForNode(cell)
819 820
                if conn is None:
                    continue
821

822 823 824
                # ask transaction information
                self.local_var.txn_info = None
                self._askStorage(conn, protocol.askTransactionInformation(serial))
825

826 827
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
828
                    continue
829
                if isinstance(self.local_var.txn_info, dict):
830 831 832
                    break

            # create history dict
833 834
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
835
            self.local_var.txn_info['tid'] = serial
836 837 838
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
            history_list.append(self.local_var.txn_info)
839 840

        return history_list
Aurel's avatar
Aurel committed
841

842 843
    def __del__(self):
        """Clear all connection."""
844
        # TODO: Stop polling thread here.
845 846 847 848 849
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
            conn.close()
    close = __del__
850 851

    def sync(self):
852
        self._waitMessage(handler=self.storage_handler)
853

854
    def connectToPrimaryMasterNode(self):
855 856 857 858 859 860 861 862 863 864
        self.master_conn = None
        logging.debug('connecting to primary master...')
        # acquire the lock to allow only one thread to connect to the primary 
        lock = self._connecting_to_master_node_acquire(1)
        try:
            if self.master_conn is not None:
                # another thread has done the job
                logging.debug('already connected')
                return
            if self.pt is not None:
865
                # pt is protected with the master lock
866 867 868 869
                self.pt.clear()
            master_index = 0
            conn = None
            # Make application execute remaining message if any
870
            self._waitMessage(handler=self.storage_handler)
871 872 873 874
            while True:
                self.setNodeReady()
                if self.primary_master_node is None:
                    # Try with master node defined in config
875
                    try:
876 877 878 879 880 881 882 883 884 885
                        addr, port = self.master_node_list[master_index].split(':')                        
                    except IndexError:
                        master_index = 0
                        addr, port = self.master_node_list[master_index].split(':')
                    port = int(port)
                else:
                    addr, port = self.primary_master_node.getServer()
                # Request Node Identification
                handler = PrimaryBoostrapEventHandler(self, self.dispatcher)
                conn = MTClientConnection(self.em, handler, (addr, port), 
886 887 888 889 890 891 892 893
                     connector_handler=self.connector_handler)
                self._nm_acquire()
                try:
                    if self.nm.getNodeByServer((addr, port)) is None:
                        n = MasterNode(server = (addr, port))
                        self.nm.add(n)
                finally:
                    self._nm_release()
894

895 896
                conn.lock()
                try:
897
                    p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, 
898
                            self.uuid, '0.0.0.0', 0, self.name)
899
                    msg_id = conn.ask(p)
900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                finally:
                    conn.unlock()

                # Wait for answer
                while 1:
                    self._waitMessage(handler=handler)
                    # Now check result
                    if self.primary_master_node is not None:
                        if self.primary_master_node == -1:
                            # Connection failed, try with another master node
                            self.primary_master_node = None
                            master_index += 1
                            break
                        elif self.primary_master_node.getServer() != (addr, port):
                            # Master node changed, connect to new one
                            break
                        elif not self.isNodeReady():
                            # Wait a bit and reask again
                            break
                        elif self.pt is not None and self.pt.operational():
                            # Connected to primary master node
                            break
                if self.pt is not None and self.pt.operational():
                    # Connected to primary master node and got all informations
                    break
                sleep(1)

            logging.info("connected to primary master node %s" % self.primary_master_node)
            conn.setHandler(PrimaryEventHandler(self, self.dispatcher))
            self.master_conn = conn

        finally:
            self._connecting_to_master_node_release()
934 935 936 937 938 939 940 941 942 943

    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

944
    def setTID(self, value):
945
        self.local_var.tid = value
946 947

    def getTID(self):
948
        return self.local_var.tid
949 950 951 952 953

    def getConflictSerial(self):
        return self.conflict_serial

    def setTransactionFinished(self):
954
        self.local_var.txn_finished = True
955 956

    def isTransactionFinished(self):
957
        return self.local_var.txn_finished
958 959

    def setTransactionVoted(self):
960
        self.local_var.txn_voted = True
961 962

    def isTransactionVoted(self):
963
        return self.local_var.txn_voted
964