app.py 30.2 KB
Newer Older
1 2
import logging
import os
3
from threading import Lock, local
4
from cPickle import dumps, loads
5
from zlib import compress, decompress
6
from Queue import Queue, Empty
7
from random import shuffle
8 9

from neo.client.mq import MQ
10
from neo.node import NodeManager, MasterNode, StorageNode
11
from neo.connection import MTClientConnection
12
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
13 14 15
        STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
        TEMPORARILY_DOWN_STATE, \
        UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
Aurel's avatar
Aurel committed
16
from neo.client.handler import ClientEventHandler
17 18
from neo.client.Storage import NEOStorageError, NEOStorageConflictError, \
     NEOStorageNotFoundError
19
from neo.util import makeChecksum, dump
20

21
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
22
from ZODB.utils import p64, u64, oid_repr
Aurel's avatar
Aurel committed
23

24 25
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
26

27 28
    def __init__(self, app, pool_size = 25):
        self.app = app
29 30 31
        self.pool_size = 0
        self.max_pool_size = pool_size
        self.connection_dict = {}
32 33 34
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
35 36 37
        l = Lock()
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
38

39
    def _initNodeConnection(self, node):
Aurel's avatar
Aurel committed
40
        """Init a connection to a given storage node."""
Aurel's avatar
Aurel committed
41
        addr = node.getNode().getServer()
42 43
        if addr is None:
            return None
44 45 46 47
        handler = ClientEventHandler(self.app, self.app.dispatcher)
        conn = MTClientConnection(self.app.em, handler, addr)
        conn.lock()
        try:
48 49
            msg_id = conn.getNextId()
            p = Packet()
Aurel's avatar
Aurel committed
50
            p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
                                        self.app.uuid, addr[0],
                                        addr[1], self.app.name)
            conn.addPacket(p)
            conn.expectMessage(msg_id)
            self.app.dispatcher.register(conn, msg_id, self.app.getQueue())
            self.app.local_var.storage_node = None
        finally:
            conn.unlock()

        self.app._waitMessage(conn, msg_id)
        if self.app.storage_node == -1:
            # Connection failed, notify primary master node
            logging.error('Connection to storage node %s failed' %(addr,))
            conn = self.app.master_conn
            conn.lock()
            try:
                msg_id = conn.getNextId()
                p = Packet()
Aurel's avatar
Aurel committed
69
                node_list = [(STORAGE_NODE_TYPE, addr[0], addr[1],
70 71 72 73 74
                              node.getUUID(), TEMPORARILY_DOWN_STATE),]
                p.notifyNodeInformation(msg_id, node_list)
                conn.addPacket(p)
            finally:
                conn.unlock()
75
            return None
76

Aurel's avatar
Aurel committed
77
        logging.info('connected to storage node %s:%d', *(conn.getAddress()))
78
        return conn
Aurel's avatar
Aurel committed
79

80
    def _dropConnection(self):
81
        """Drop a connection."""
Aurel's avatar
Aurel committed
82 83
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
84 85 86
            conn.lock()
            try:
                if not conn.prending() and not self.app.dispatcher.registered(conn.getUUID()):
Aurel's avatar
Aurel committed
87 88
                    conn.close()
                    break
Aurel's avatar
Aurel committed
89 90
            finally:
                conn.unlock()
Aurel's avatar
Aurel committed
91
        logging.info('connection to storage node %s:%d closed', *(conn.getAddress()))
92 93 94

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
Aurel's avatar
Aurel committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
        if self.pool_size > self.max_pool_size:
            # must drop some unused connections
            self._dropConnection()
        conn = self._initNodeConnection(node)
        if conn is None:
            return None
        # add node to node manager
        if self.app.nm.getNodeByServer(node.getServer()) is None:
            n = StorageNode(node.getServer())
            self.app.nm.add(n)
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
112 113 114
        self.connection_lock_acquire()
        try:
            if self.connection_dict.has_key(node.getUUID()):
Aurel's avatar
Aurel committed
115 116 117 118 119 120 121
                # Already connected to node
                conn = self.connection_dict[node.getUUID()]
                conn.lock()
                return conn
            else:
                # Create new connection to node
                return self._createNodeConnection(node)
122 123 124
        finally:
            self.connection_lock_release()

125 126
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
127 128 129 130 131 132
        self.connection_lock_acquire()
        try:
            if self.connection_dict.has_key(node.getUUID()):
                self.connection_dict.pop(node.getUUID())
        finally:
            self.connection_lock_release()
133

134

Aurel's avatar
Aurel committed
135
class Application(object):
136 137
    """The client node application."""

138
    def __init__(self, master_nodes, name, em, dispatcher, request_queue, **kw):
139
        logging.basicConfig(level = logging.DEBUG)
140
        logging.debug('master node address are %s' %(master_nodes,))
141
        # Internal Attributes common to all thread
Aurel's avatar
Aurel committed
142
        self.name = name
143 144
        self.em = em
        self.dispatcher = dispatcher
145
        self.nm = NodeManager()
146
        self.cp = ConnectionPool(self)
147
        self.pt = None
148
        self.request_queue = request_queue
149
        self.primary_master_node = None
150
        self.master_node_list = master_nodes.split(' ')
151 152 153
        self.master_conn = None
        self.uuid = None
        self.mq_cache = MQ()
154
        self.new_oid_list = []
155
        self.ptid = None
156 157
        self.num_replicas = 0
        self.num_partitions = 0
158 159 160 161
        # Transaction specific variable
        self.tid = None
        self.txn = None
        self.txn_data_dict = {}
Aurel's avatar
Aurel committed
162
        self.txn_object_stored = 0
163 164 165 166
        self.txn_voted = 0
        self.txn_finished = 0
        # Internal attribute distinct between thread
        self.local_var = local()
167
        # Lock definition :
168
        # _load_lock is used to make loading and storing atmic
169 170 171
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
        # _cache_lock is used for the client cache
172
        lock = Lock()
173 174
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
175 176
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
177
        self._oid_lock_release = lock.release
178 179 180 181 182 183 184 185 186 187 188 189
        lock = Lock()
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
        # XXX Generate an UUID for self. For now, just use a random string.
        # Avoid an invalid UUID.
        if self.uuid is None:
            while 1:
                uuid = os.urandom(16)
                if uuid != INVALID_UUID:
                    break
            self.uuid = uuid

190 191 192 193 194 195 196 197
    def getQueue(self):
        return self.local_var.__dict__.setdefault('queue', Queue(5))

    def _waitMessage(self, target_conn = None, msg_id = None):
        """Wait for a message returned by the dispatcher in queues."""
        global_queue = self.request_queue
        local_queue = self.getQueue()

198 199
        while 1:
            try:
200 201
                conn, packet = global_queue.get_nowait()
                conn.handler.dispatch(conn, packet)
202
            except Empty:
203 204 205 206 207 208 209 210 211
                if msg_id is None:
                    try:
                        conn, packet = local_queue.get_nowait()
                    except Empty:
                        break
                else:
                    conn, packet = local_queue.get()

            conn.lock()
212
            try:
213 214 215
                conn.handler.dispatch(conn, packet)
            finally:
                conn.unlock()
216

217
            if target_conn is conn and msg_id == packet.getId() and packet.getType() & 0x8000:
218
                break
219

220 221 222
    def registerDB(self, db, limit):
        self._db = db

223 224 225 226 227
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
                conn = self.master_conn
                conn.lock()
                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askNewOIDs(msg_id, 25)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                finally:
                    conn.unlock()

                self._waitMessage(conn, msg_id)
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
Aurel's avatar
Aurel committed
247
            return self.new_oid_list.pop()
248 249 250
        finally:
            self._oid_lock_release()

251

Aurel's avatar
Aurel committed
252 253 254 255
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
256 257
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
258 259 260
        finally:
            self._cache_lock_release()
        # history return serial, so use it
261
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
262 263
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
264 265 266 267
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
268

269
    def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
Aurel's avatar
Aurel committed
270
        """Internal method which manage load ,loadSerial and loadBefore."""
271
        partition_id = u64(oid) % self.num_partitions
272
        # Only used up to date node for retrieving object
273
        cell_list = self.pt.getCellList(partition_id, True)
274
        data = None
275

276
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
277
            # FIXME must wait for cluster to be ready
Aurel's avatar
Aurel committed
278
            raise NEOStorageNotFoundError()
279 280 281 282

        shuffle(cell_list)
        self.local_var.asked_object = -1
        for cell in cell_list:
283
            logging.debug('trying to load %s from %s',
284
                          dump(oid), dump(cell.getUUID()))
285
            conn = self.cp.getConnForNode(cell)
286 287
            if conn is None:
                continue
288 289 290 291 292 293 294 295 296 297 298

            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.askObject(msg_id, oid, serial, tid)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.local_var.asked_object = 0
            finally:
                conn.unlock()
299

Aurel's avatar
Aurel committed
300
            # Wait for answer
301 302 303
            # asked object retured value are :
            # -1 : oid not found
            # other : data
304
            self._waitMessage(conn, msg_id)
305 306 307 308 309 310 311
            if self.local_var.asked_object == -1:
                # OID not found
                # XXX either try with another node, either raise error here
                # for now try with another node
                continue

            # Check data
312 313
            noid, start_serial, end_serial, compression, checksum, data \
                    = self.local_var.asked_object
314 315 316
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s' \
317
                              % (noid, oid, cell.getServer()))
318
                continue
319 320
            elif checksum != makeChecksum(data):
                # Check checksum.
Aurel's avatar
Aurel committed
321
                logging.error('wrong checksum from node %s for oid %s' \
322
                              % (cell.getServer(), oid))
323 324
                continue
            else:
325
                # Everything looks alright.
326
                break
327

Aurel's avatar
Aurel committed
328
        if self.local_var.asked_object == -1:
329
            # We didn't got any object from all storage node
330
            logging.debug('oid %s not found', dump(oid))
331
            raise NEOStorageNotFoundError()
332

333
        # Uncompress data
Aurel's avatar
Aurel committed
334
        if compression:
335
            data = decompress(data)
336

Aurel's avatar
Aurel committed
337 338
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
339
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
340
            try:
341
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
342
            finally:
Aurel's avatar
Aurel committed
343
                self._cache_lock_release()
Aurel's avatar
Aurel committed
344 345
        if end_serial == INVALID_SERIAL:
            end_serial = None
Aurel's avatar
Aurel committed
346
        return loads(data), start_serial, end_serial
347

348

349 350 351
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
352
        self._load_lock_acquire()
353
        try:
354 355 356 357 358 359 360 361 362
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
                    return loads(self.mq_cache[oid][1]), self.mq_cache[oid][0]
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
363
        finally:
364
            self._load_lock_release()
Aurel's avatar
Aurel committed
365

366

367
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
368
        """Load an object for a given oid and serial."""
369 370
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
371
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
372

373

374
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
375
        """Load an object for a given oid before tid committed."""
376
        # Do not try in cache as it manages only up-to-date object
377 378
        if tid is None:
            tid = INVALID_TID
379
        logging.debug('loading %s before %s', dump(oid), dump(tid))
380
        data, start, end = self._load(oid, tid=tid)
381 382 383 384 385
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
386

387

388 389 390 391
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
        if self.txn == transaction:
392
            # We already begin the same transaction
393 394 395 396 397 398
            return
        self.txn = transaction
        # Get a new transaction id if necessary
        if tid is None:
            self.tid = None
            conn = self.master_conn
399 400 401 402 403 404 405 406 407 408 409
            conn.lock()
            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.askNewTID(msg_id)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
            finally:
                conn.unlock()

Aurel's avatar
Aurel committed
410
            # Wait for answer
411
            self._waitMessage(conn, msg_id)
412 413
            if self.tid is None:
                raise NEOStorageError('tpc_begin failed')
414 415 416
        else:
            self.tid = tid

417

418 419 420
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
        if transaction is not self.txn:
421
            raise StorageTransactionError(self, transaction)
422 423
        if serial is None:
            serial = INVALID_SERIAL
Aurel's avatar
Aurel committed
424
        logging.debug('storing oid %s serial %s',
425
                     dump(oid), dump(serial))
426
        # Find which storage node to use
427
        partition_id = u64(oid) % self.num_partitions
428 429
        cell_list = self.pt.getCellList(partition_id, False)
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
430 431
            # FIXME must wait for cluster to be ready
            raise NEOStorageError
432
        # Store data on each node
Aurel's avatar
Aurel committed
433 434
        ddata = dumps(data)
        compressed_data = compress(ddata)
435
        checksum = makeChecksum(compressed_data)
436 437
        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
438 439
            if conn is None:
                continue
440 441 442 443

            try:
                msg_id = conn.getNextId()
                p = Packet()
Aurel's avatar
Aurel committed
444
                p.askStoreObject(msg_id, oid, serial, 1,
445 446 447 448 449 450 451
                                 checksum, compressed_data, self.tid)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.txn_object_stored = 0
            finally:
                conn.unlock()
452 453

            # Check we don't get any conflict
454
            self._waitMessage(conn, msg_id)
Aurel's avatar
Aurel committed
455
            if self.txn_object_stored[0] == -1:
456
                if self.txn_data_dict.has_key(oid):
Aurel's avatar
Aurel committed
457 458 459 460
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
461
                    del self.txn_data_dict[oid]
Aurel's avatar
Aurel committed
462
                self.conflict_serial = self.txn_object_stored[1]
463
                raise NEOStorageConflictError
464 465

        # Store object in tmp cache
Aurel's avatar
Aurel committed
466
        noid, nserial = self.txn_object_stored
467 468
        self.txn_data_dict[oid] = ddata

469 470
        return self.tid

Aurel's avatar
Aurel committed
471

472 473 474
    def tpc_vote(self, transaction):
        """Store current transaction."""
        if transaction is not self.txn:
475
            raise StorageTransactionError(self, transaction)
476 477
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
478
        ext = dumps(transaction._extension)
479
        oid_list = self.txn_data_dict.keys()
480
        # Store data on each node
481
        partition_id = u64(self.tid) % self.num_partitions
482 483 484
        cell_list = self.pt.getCellList(partition_id, True)
        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
485 486
            if conn is None:
                continue
487 488 489 490

            try:
                msg_id = conn.getNextId()
                p = Packet()
Aurel's avatar
Aurel committed
491
                p.askStoreTransaction(msg_id, self.tid, user, desc, ext,
492 493 494 495 496 497 498 499 500
                                      oid_list)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.txn_voted == 0
            finally:
                conn.unlock()

            self._waitMessage(conn, msg_id)
501 502 503
            if self.txn_voted != 1:
                raise NEOStorageError('tpc_vote failed')

504
    def _clear_txn(self):
505 506
        """Clear some transaction parameters."""
        self.tid = None
507
        self.txn = None
508
        self.txn_data_dict.clear()
509 510 511
        self.txn_voted = 0
        self.txn_finished = 0

512 513 514 515
    def tpc_abort(self, transaction):
        """Abort current transaction."""
        if transaction is not self.txn:
            return
Aurel's avatar
Aurel committed
516 517

        # Abort txn in node where objects were stored
518
        aborted_node_set = set()
Aurel's avatar
Aurel committed
519
        for oid in self.txn_data_dict.iterkeys():
520
            partition_id = u64(oid) % self.num_partitions
521 522 523 524
            cell_list = self.pt.getCellList(partition_id, True)
            for cell in cell_list:
                if cell.getNode() not in aborted_node_set:
                    conn = self.cp.getConnForNode(cell)
525 526
                    if conn is None:
                        continue
527 528 529 530 531 532 533 534 535 536

                    try:
                        msg_id = conn.getNextId()
                        p = Packet()
                        p.abortTransaction(msg_id, self.tid)
                        conn.addPacket(p)
                    finally:
                        conn.unlock()

                    aborted_node_set.add(cell.getNode())
Aurel's avatar
Aurel committed
537 538

        # Abort in nodes where transaction was stored
539
        partition_id = u64(self.tid) % self.num_partitions
540 541 542 543
        cell_list = self.pt.getCellList(partition_id, True)
        for cell in cell_list:
            if cell.getNode() not in aborted_node_set:
                conn = self.cp.getConnForNode(cell)
Aurel's avatar
Aurel committed
544 545 546
                if conn is None:
                    continue

547 548 549 550 551 552 553
                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.abortTransaction(msg_id, self.tid)
                    conn.addPacket(p)
                finally:
                    conn.unlock()
Aurel's avatar
Aurel committed
554

555 556
                aborted_node_set.add(cell.getNode())

557 558 559 560
        # Abort the transaction in the primary master node.
        conn = self.master_conn
        conn.lock()
        try:
Aurel's avatar
Aurel committed
561
            conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.tid))
562 563 564
        finally:
            conn.unlock()

565
        self._clear_txn()
566

567 568 569 570
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
        if self.txn is not transaction:
            return
571
        self._load_lock_acquire()
572
        try:
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
            # Call function given by ZODB
            if f is not None:
                f(self.tid)

            # Call finish on master
            oid_list = self.txn_data_dict.keys()
            conn = self.master_conn
            conn.lock()
            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.finishTransaction(msg_id, oid_list, self.tid)
                conn.addPacket(p)
                conn.expectMessage(msg_id, additional_timeout = 300)
                self.dispatcher.register(conn, msg_id, self.getQueue())
            finally:
                conn.unlock()
590

591 592 593 594
            # Wait for answer
            self._waitMessage(conn, msg_id)
            if self.txn_finished != 1:
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
595

596 597 598 599 600 601 602 603 604 605 606
            # Update cache
            self._cache_lock_acquire()
            try:
                for oid in self.txn_data_dict.iterkeys():
                    ddata = self.txn_data_dict[oid]
                    # Now serial is same as tid
                    self.mq_cache[oid] = self.tid, ddata
            finally:
                self._cache_lock_release()
            self._clear_txn()
            return self.tid
607
        finally:
608
            self._load_lock_release()
609

610

Aurel's avatar
Aurel committed
611
    def undo(self, transaction_id, txn, wrapper):
612 613
        if transaction_id is not self.txn:
            raise StorageTransactionError(self, transaction_id)
614

615
        # First get transaction information from a storage node.
616
        partition_id = u64(transaction_id) % self.num_partitions
617 618 619 620
        cell_list = self.pt.getCellList(partition_id, True)
        shuffle(cell_list)
        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
621 622
            if conn is None:
                continue
623 624 625 626 627 628 629 630 631 632 633 634

            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.askTransactionInformation(msg_id, transaction_id)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.local_var.txn_info = 0
            finally:
                conn.unlock()

635
            # Wait for answer
636
            self._waitMessage(conn, msg_id)
637 638 639
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
640
            elif isinstance(self.local_var.txn_info, dict):
641 642 643 644 645 646 647 648
                break
            else:
                raise NEOStorageError('undo failed')

        if self.local_var.txn_info == -1:
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
649 650 651
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
Aurel's avatar
Aurel committed
652 653 654 655 656 657 658 659 660 661
            try:
                data, start, end = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # Object created by transaction, so no previous record
                data_dict[oid] = None
                continue
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
                raise UndoError("non-undoable transaction")
Aurel's avatar
Aurel committed
662 663 664
            data_dict[oid] = data
        # Third do transaction with old data
        self.tpc_begin(txn)
Aurel's avatar
Aurel committed
665

Aurel's avatar
Aurel committed
666 667
        for oid in data_dict.keys():
            data = data_dict[oid]
Aurel's avatar
Aurel committed
668 669 670 671
            try:
                self.store(oid, self.tid, data, None, txn)
            except NEOStorageConflictError, serial:
                if serial <= self.tid:
672
                    new_data = wrapper.tryToResolveConflict(oid, self.tid,
673
                                                            serial, data)
Aurel's avatar
Aurel committed
674 675 676
                    if new_data is not None:
                        self.store(oid, self.tid, new_data, None, txn)
                        continue
677 678
                raise ConflictError(oid = oid, serials = (self.tid, serial),
                                    data = data)
Aurel's avatar
Aurel committed
679

Aurel's avatar
Aurel committed
680 681
        self.tpc_vote(txn)
        self.tpc_finish(txn)
682

683 684 685 686 687 688

    def undoLog(self, first, last, filter=None):
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

689
        # First get a list of transactions from all storage nodes.
690 691 692 693
        storage_node_list = [x for x in self.pt.getNodeList() if x.getState() \
                             in (UP_TO_DATE_STATE, FEEDING_STATE)]
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
694
            conn = self.cp.getConnForNode(storage_node)
695 696
            if conn is None:
                continue
697

698 699 700
            try:
                msg_id = conn.getNextId()
                p = Packet()
701
                p.askTIDs(msg_id, first, last, INVALID_PARTITION)
702 703 704 705 706 707
                conn.addPacket(p)
            finally:
                conn.unlock()

        # Wait for answers from all storages.
        # FIXME this is a busy loop.
708 709
        while True:
            self._waitMessage()
Aurel's avatar
Aurel committed
710
            if len(self.local_var.node_tids.keys()) == len(storage_node_list):
711 712 713 714 715 716 717 718 719 720 721 722
                break

        # Reorder tids
        ordered_tids = []
        for tids in self.local_var.node_tids.values():
            ordered_tids.append(tids)
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)

        # For each transaction, get info
        undo_info = []
        for tid in ordered_tids:
723
            partition_id = u64(tid) % self.num_partitions
724 725 726 727
            cell_list = self.pt.getCellList(partition_id, True)
            shuffle(cell_list)
            for cell in cell_list:
                conn = self.cp.getConnForNode(storage_node)
728 729
                if conn is None:
                    continue
730 731 732 733 734 735 736 737 738 739 740 741

                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askTransactionInformation(msg_id, tid)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                    self.local_var.txn_info = 0
                finally:
                    conn.unlock()

742
                # Wait for answer
743
                self._waitMessage(conn, msg_id)
744 745 746
                if self.local_var.txn_info == -1:
                    # TID not found, go on with next node
                    continue
747
                elif isinstance(self.local_var.txn_info, dict):
748
                    break
749 750 751 752 753 754 755 756 757 758

            # Filter result if needed
            if filter is not None:
                # Filter method return True if match
                if not filter(self.local_var.txn_info['description']):
                    continue

            # Append to returned list
            self.local_var.txn_info.pop("oids")
            undo_info.append(self.local_var.txn_info)
759
            if len(undo_info) >= last - first:
760 761 762 763
                break

        return undo_info

Aurel's avatar
Aurel committed
764

Aurel's avatar
Aurel committed
765
    def history(self, oid, version, length=1, filter=None, object_only=0):
766
        # Get history informations for object first
767
        partition_id = u64(oid) % self.num_partitions
768 769 770 771 772
        cell_list = self.pt.getCellList(partition_id, True)
        shuffle(cell_list)

        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
773 774
            if conn is None:
                continue
775 776 777 778

            try:
                msg_id = conn.getNextId()
                p = Packet()
779
                p.askObjectHistory(msg_id, oid, 0, length)
780 781 782 783 784 785 786 787
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.local_var.history = None
            finally:
                conn.unlock()

            self._waitMessage(conn, msg_id)
788 789 790 791 792 793
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
                continue
794 795

        if not isinstance(self.local_var.history, dict):
796 797 798 799 800 801
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
802
        history_list = []
803
        for serial, size in self.local_var.hisory[1]:
804
            partition_id = u64(serial) % self.num_partitions
805 806 807 808 809
            cell_list = self.pt.getCellList(partition_id, True)
            shuffle(cell_list)

            for cell in cell_list:
                conn = self.cp.getConnForNode(cell)
810 811
                if conn is None:
                    continue
812 813 814 815 816 817 818 819 820 821 822 823

                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askTransactionInformation(msg_id, serial)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                    self.local_var.txn_info = None
                finally:
                    conn.unlock()

824
                # Wait for answer
825
                self._waitMessage(conn, msg_id)
826 827
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
828
                    continue
829
                if isinstance(self.local_var.txn_info, dict):
830 831 832 833 834 835 836 837 838 839 840
                    break

            # create history dict
            self.txn_info.remove('id')
            self.txn_info.remove('oids')
            self.txn_info['serial'] = serial
            self.txn_info['version'] = None
            self.txn_info['size'] = size
            history_list.append(self.txn_info)

        return history_list
Aurel's avatar
Aurel committed
841

842 843 844 845 846 847 848
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
            conn.close()
    close = __del__