app.py 39.8 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

17
import logging
18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from Queue import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24 25

from neo.client.mq import MQ
26
from neo.node import NodeManager, MasterNode, StorageNode
27
from neo.connection import MTClientConnection
28
from neo import protocol
29 30
from neo.protocol import INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
        INVALID_PTID, CLIENT_NODE_TYPE, INVALID_SERIAL, \
Aurel's avatar
Aurel committed
31
        DOWN_STATE, HIDDEN_STATE
32 33 34
from neo.client.handlers.master import PrimaryBootstrapHandler, \
        PrimaryNotificationsHandler, PrimaryAnswersHandler
from neo.client.handlers.storage import StorageBootstrapHandler, \
35
        StorageAnswersHandler, StorageEventHandler
36
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
37
     NEOStorageNotFoundError
38
from neo.exception import NeoException
39
from neo.util import makeChecksum, dump
40
from neo.connector import getConnectorHandler
41 42 43
from neo.client.dispatcher import Dispatcher
from neo.client.poll import ThreadedPoll
from neo.event import EventManager
44
from neo.locking import RLock, Lock
45

46
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
Aurel's avatar
Aurel committed
47

48 49
class ConnectionClosed(Exception): pass

50 51
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
52

53
    def __init__(self, app, max_pool_size = 25):
54
        self.app = app
55
        self.max_pool_size = max_pool_size
56
        self.connection_dict = {}
57 58 59
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
60
        l = RLock()
61 62
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
63

64
    def _initNodeConnection(self, node):
Aurel's avatar
Aurel committed
65
        """Init a connection to a given storage node."""
66
        addr = node.getServer()
67 68
        if addr is None:
            return None
69

Yoshinori Okuji's avatar
Yoshinori Okuji committed
70 71 72
        app = self.app

        # Loop until a connection is obtained.
73
        while True:
Aurel's avatar
Aurel committed
74
            logging.info('trying to connect to %s - %s', node, node.getState())
75
            app.setNodeReady()
76 77
            conn = MTClientConnection(self.app.local_var, app.em, app.storage_event_handler, 
                    addr, connector_handler=app.connector_handler, dispatcher=app.dispatcher)
78 79 80 81
            conn.lock()
            try:
                if conn.getConnector() is None:
                    # This happens, if a connection could not be established.
82
                    logging.error('Connection to storage node %s failed', node)
83
                    return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
84

85
                p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
86
                            app.uuid, '0.0.0.0', 0, app.name)
87
                msg_id = conn.ask(p)
88 89
            finally:
                conn.unlock()
90

Yoshinori Okuji's avatar
Yoshinori Okuji committed
91
            try:
92 93
                app._waitMessage(conn, msg_id, handler=app.storage_bootstrap_handler)
            except ConnectionClosed:
94
                logging.error('Connection to storage node %s failed', node)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
95 96
                return None

97 98 99 100
            if app.isNodeReady():
                logging.info('connected to storage node %s', node)
                return conn
            else:
101
                logging.info('Storage node %s not ready', node)
Aurel's avatar
Aurel committed
102
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
103

104 105
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
106 107
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
108 109
            conn.lock()
            try:
110
                if not conn.pending() and \
111
                        not self.app.dispatcher.registered(conn):
112
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
113
                    conn.close()
114
                    logging.debug('_dropConnections : connection to storage node %s:%d closed', 
115 116 117
                                 *(conn.getAddress()))
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
118 119
            finally:
                conn.unlock()
120 121 122

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
123
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
124
            # must drop some unused connections
125
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
126 127 128 129 130 131 132

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
133 134
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
135

Aurel's avatar
Aurel committed
136 137 138 139 140 141 142 143
        # add node to node manager
        if self.app.nm.getNodeByServer(node.getServer()) is None:
            n = StorageNode(node.getServer())
            self.app.nm.add(n)
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

144 145 146
    def getConnForCell(self, cell):
        return self.getConnForNode(cell.getNode())

Aurel's avatar
Aurel committed
147 148 149
    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
150
        if node.getState() in (DOWN_STATE, HIDDEN_STATE):
151
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
152
        uuid = node.getUUID()
153 154
        self.connection_lock_acquire()
        try:
155 156
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
157 158 159
                # Already connected to node
                conn.lock()
                return conn
160
            except KeyError:
Aurel's avatar
Aurel committed
161 162
                # Create new connection to node
                return self._createNodeConnection(node)
163 164 165
        finally:
            self.connection_lock_release()

166 167
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
168 169
        self.connection_lock_acquire()
        try:
170 171 172 173
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
174 175
        finally:
            self.connection_lock_release()
176

177

178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
class ThreadContext(object):

    _threads_dict = {}

    def __getThreadData(self):
        id = get_ident()
        try:
            result = self._threads_dict[id]
        except KeyError:
            self.clear(id)
            result = self._threads_dict[id]
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
        
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

    def clear(self, id=None):
        if id is None:
            id = get_ident()
        self._threads_dict[id] = {
            'tid': None,
            'txn': None,
            'data_dict': {},
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
212
            'queue': Queue(5),
213 214 215
        }


Aurel's avatar
Aurel committed
216
class Application(object):
217 218
    """The client node application."""

219
    def __init__(self, master_nodes, name, connector, **kw):
220 221 222 223
        # XXX: use a configuration entry
        from neo import buildFormatString
        format = buildFormatString('CLIENT')
        logging.basicConfig(level=logging.DEBUG, format=format)
224 225 226
        em = EventManager()
        # Start polling thread
        self.poll_thread = ThreadedPoll(em)
227
        # Internal Attributes common to all thread
Aurel's avatar
Aurel committed
228
        self.name = name
229
        self.em = em
230
        self.connector_handler = getConnectorHandler(connector)
231
        self.dispatcher = Dispatcher()
232
        self.nm = NodeManager()
233
        self.cp = ConnectionPool(self)
234
        self.pt = None
235
        self.master_conn = None
236
        self.primary_master_node = None
237 238
        self.trying_master_node = None
        # XXX: this code duplicates neo.config.ConfigurationManager.getMasterNodeList
239
        logging.debug('master node address are %s' % (master_nodes,))
240 241 242 243 244 245 246 247 248 249 250 251 252
        self.master_node_list = master_node_list = []
        for node in master_nodes.split():
            if not node:
                continue
            if ':' in node:
                ip_address, port = node.split(':')
                port = int(port)
            else:
                ip_address = node
                port = 10100 # XXX: default_master_port
            server = (ip_address, port)
            master_node_list.append(server)
            self.nm.add(MasterNode(server=server))
253 254
        # no self-assigned UUID, primary master will supply us one
        self.uuid = INVALID_UUID
255
        self.mq_cache = MQ()
256
        self.new_oid_list = []
257
        self.ptid = INVALID_PTID
258 259 260 261 262
        self.storage_event_handler = StorageEventHandler(self, self.dispatcher)
        self.storage_bootstrap_handler = StorageBootstrapHandler(self)
        self.storage_handler = StorageAnswersHandler(self)
        self.primary_handler = PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = PrimaryBootstrapHandler(self)
263
        self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher)
264
        # Internal attribute distinct between thread
265
        self.local_var = ThreadContext()
266
        # Lock definition :
267
        # _load_lock is used to make loading and storing atomic
268
        lock = Lock()
269 270
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
271 272
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
273 274
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
275
        self._oid_lock_release = lock.release
276
        lock = Lock()
277
        # _cache_lock is used for the client cache
278 279
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
280
        lock = Lock()
281 282
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
283 284
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
285 286 287 288
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
289

290
    def notifyDeadNode(self, conn):
291
        """ Notify a storage failure to the primary master """
292
        s_node = self.nm.getNodeByServer(conn.getAddress())
293
        if s_node is None or s_node.getNodeType() != protocol.STORAGE_NODE_TYPE:
294 295 296 297 298 299
            return
        s_uuid = s_node.getUUID()
        ip_address, port = s_node.getServer()
        m_conn = self._getMasterConnection()
        m_conn.lock()
        try:
300
            node_list = [(protocol.STORAGE_NODE_TYPE, ip_address, port, s_uuid, s_node.getState())]
301 302 303
            m_conn.notify(protocol.notifyNodeInformation(node_list))
        finally:
            m_conn.unlock()
304

305
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
306
        """Wait for a message returned by the dispatcher in queues."""
307
        local_queue = self.local_var.queue
308
        while 1:
309 310
            if msg_id is None:
                try:
311
                    conn, packet = local_queue.get_nowait()
312 313 314 315 316
                except Empty:
                    break
            else:
                conn, packet = local_queue.get()
            # check fake packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
317
            if packet is None:
318
                if conn.getUUID() == target_conn.getUUID():
319
                    raise ConnectionClosed
Yoshinori Okuji's avatar
Yoshinori Okuji committed
320 321
                else:
                    continue
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
            # Guess the handler to use based on the type of node on the
            # connection
            if handler is None:
                node = self.nm.getNodeByServer(conn.getAddress())
                if node is None:
                    raise ValueError, 'Expecting an answer from a node ' \
                        'which type is not known... Is this right ?'
                else:
                    node_type = node.getType()
                    if node_type == protocol.STORAGE_NODE_TYPE:
                        handler = self.storage_handler
                    elif node_type == protocol.MASTER_NODE_TYPE:
                        handler = self.primary_handler
                    else:
                        raise ValueError, 'Unknown node type: %r' % (
                            node_type, )
338
            handler.dispatch(conn, packet)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
339 340
            if target_conn is conn and msg_id == packet.getId() \
                    and packet.getType() & 0x8000:
341
                break
342

343 344 345
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
346
            msg_id = conn.ask(packet, timeout, additional_timeout)
347 348 349 350 351 352 353
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
354
        conn = self._getMasterConnection()
355 356
        conn.lock()
        try:
357
            msg_id = conn.ask(packet, timeout, additional_timeout)
358 359 360 361
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

362 363
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
364
        # acquire the lock to allow only one thread to connect to the primary 
365
        lock = self._connecting_to_master_node_acquire()
366 367 368 369 370 371 372 373 374
        try:
            if self.master_conn is None:    
                self.master_conn = self._connectToPrimaryMasterNode()
            return self.master_conn
        finally:
            self._connecting_to_master_node_release()

    def _getPartitionTable(self):
        """ Return the partition table manager, reconnect the PMN if needed """
375 376 377 378 379 380 381 382 383
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

    def _getCellListForID(self, id, readable=False, writable=False):
        """ Return the cells available for the specified (O|T)ID """
        pt = self._getPartitionTable()
        return pt.getCellListForID(id, readable, writable)
384 385 386

    def _connectToPrimaryMasterNode(self):
        logging.debug('connecting to primary master...')
387 388 389 390 391 392 393 394 395 396
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
397
                    self.primary_master_node = None
398 399 400 401 402 403 404 405 406 407
                else:
                    # Otherwise, check one by one.
                    master_list = nm.getMasterNodeList()
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
408
                conn = MTClientConnection(self.local_var, self.em, self.notifications_handler,
409
                                          addr=self.trying_master_node.getServer(),
410 411
                                          connector_handler=self.connector_handler,
                                          dispatcher=self.dispatcher)
412 413
                # Query for primary master node
                conn.lock()
414
                try:
415 416 417 418 419
                    if conn.getConnector() is None:
                        # This happens, if a connection could not be established.
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
420
                    msg_id = conn.ask(protocol.askPrimaryMaster())
421 422
                finally:
                    conn.unlock()
423 424 425 426
                try:
                    self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                except ConnectionClosed:
                    continue
427 428 429 430
                # If we reached the primary master node, mark as connected
                connected = self.primary_master_node is not None \
                            and self.primary_master_node is self.trying_master_node

431
            logging.info('connected to a primary master node')
432
            # Identify to primary master and request initial data
433 434 435
            while conn.getUUID() is None:
                conn.lock()
                try:
436 437 438 439 440
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
441 442
                    p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
                            self.uuid, '0.0.0.0', 0, self.name)
443
                    msg_id = conn.ask(p)
444 445
                finally:
                    conn.unlock()
446 447 448 449 450
                try:
                    self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
451
                if conn.getUUID() is None:
452 453 454 455 456 457 458 459 460 461
                    # Node identification was refused by master.
                    # Sleep a bit an retry.
                    # XXX: This should be replaced by:
                    # - queuing requestNodeIdentification at master side
                    # - sending the acceptance from master when it becomes
                    #   ready
                    # Thus removing the need to:
                    # - needlessly bother the primary master every 5 seconds
                    #   (...per client)
                    # - have a sleep in the code (yuck !)
462
                    sleep(5)
463 464 465 466 467 468
            if self.uuid != INVALID_UUID:
                # TODO: pipeline those 2 requests
                # This is currently impossible because _waitMessage can only
                # wait on one message at a time
                conn.lock()
                try:
469
                    msg_id = conn.ask(protocol.askPartitionTable([]))
470 471 472 473 474
                finally:
                    conn.unlock()
                self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                conn.lock()
                try:
475
                    msg_id = conn.ask(protocol.askNodeInformation())
476 477 478 479 480
                finally:
                    conn.unlock()
                self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
            ready = self.uuid != INVALID_UUID and self.pt is not None \
                                 and self.pt.operational()
481 482 483
        logging.info("connected to primary master node %s" % self.primary_master_node)
        return conn
        
484 485 486
    def registerDB(self, db, limit):
        self._db = db

487 488 489
    def getDB(self):
        return self._db

490 491 492 493 494
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
495 496 497 498
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
499
                self._askPrimary(protocol.askNewOIDs(100))
500 501
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
Aurel's avatar
Aurel committed
502
            return self.new_oid_list.pop()
503 504 505
        finally:
            self._oid_lock_release()

506

Aurel's avatar
Aurel committed
507 508 509 510
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
511 512
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
513 514 515
        finally:
            self._cache_lock_release()
        # history return serial, so use it
516
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
517 518
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
519 520 521 522
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
523

524
    def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
Aurel's avatar
Aurel committed
525
        """Internal method which manage load ,loadSerial and loadBefore."""
526
        cell_list = self._getCellListForID(oid, readable=True)
527 528 529 530 531 532 533 534 535 536
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
            logging.error('oid %s not found because no storage is available for it', dump(oid))
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
537
            conn = self.cp.getConnForCell(cell)
538 539
            if conn is None:
                continue
540

541
            try:
542
                self._askStorage(conn, protocol.askObject(oid, serial, tid))
543
            except ConnectionClosed:
544
                continue
545

546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
                              noid, dump(oid), cell.getServer())
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
                              cell.getServer(), dump(oid))
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
            # We didn't got any object from all storage node because of connection error
            logging.warning('oid %s not found because of connection failure', dump(oid))
            raise NEOStorageNotFoundError()
573

Aurel's avatar
Aurel committed
574
        if self.local_var.asked_object == -1:
575
            # We didn't got any object from all storage node
576
            logging.info('oid %s not found', dump(oid))
577
            raise NEOStorageNotFoundError()
578

579
        # Uncompress data
Aurel's avatar
Aurel committed
580
        if compression:
581
            data = decompress(data)
582

Aurel's avatar
Aurel committed
583 584
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
585
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
586
            try:
587
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
588
            finally:
Aurel's avatar
Aurel committed
589
                self._cache_lock_release()
Aurel's avatar
Aurel committed
590 591
        if end_serial == INVALID_SERIAL:
            end_serial = None
592
        return data, start_serial, end_serial
593

594

595 596 597
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
598
        self._load_lock_acquire()
599
        try:
600 601 602 603
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
604
                    return self.mq_cache[oid][1], self.mq_cache[oid][0]
605 606 607 608
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
609
        finally:
610
            self._load_lock_release()
Aurel's avatar
Aurel committed
611

612

613
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
614
        """Load an object for a given oid and serial."""
615 616
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
617
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
618

619

620
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
621
        """Load an object for a given oid before tid committed."""
622
        # Do not try in cache as it manages only up-to-date object
623 624
        if tid is None:
            tid = INVALID_TID
625
        logging.debug('loading %s before %s', dump(oid), dump(tid))
626
        data, start, end = self._load(oid, tid=tid)
627 628 629 630 631
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
632

633

634 635 636
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
637
        if self.local_var.txn is transaction:
638
            # We already begin the same transaction
639 640 641
            return
        # Get a new transaction id if necessary
        if tid is None:
642
            self.local_var.tid = None
643
            self._askPrimary(protocol.askNewTID())
644
            if self.local_var.tid is None:
645
                raise NEOStorageError('tpc_begin failed')
646
        else:
647 648
            self.local_var.tid = tid
        self.local_var.txn = transaction            
649

650

651 652
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
653
        if transaction is not self.local_var.txn:
654
            raise StorageTransactionError(self, transaction)
655 656
        if serial is None:
            serial = INVALID_SERIAL
Aurel's avatar
Aurel committed
657
        logging.debug('storing oid %s serial %s',
658
                     dump(oid), dump(serial))
659
        # Find which storage node to use
660
        cell_list = self._getCellListForID(oid, writable=True)
661
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
662 663
            # FIXME must wait for cluster to be ready
            raise NEOStorageError
664
        # Store data on each node
665
        compressed_data = compress(data)
666
        checksum = makeChecksum(compressed_data)
667
        self.local_var.object_stored_counter = 0
668
        for cell in cell_list:
669
            conn = self.cp.getConnForCell(cell)
670
            if conn is None:                
671
                continue
672

673 674 675
            self.local_var.object_stored = 0
            p = protocol.askStoreObject(oid, serial, 1,
                     checksum, compressed_data, self.local_var.tid)
676 677
            try:
                self._askStorage(conn, p)
678
            except ConnectionClosed:
679
                continue
680 681

            # Check we don't get any conflict
682 683
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
684 685 686 687
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
688 689
                    del self.local_var.data_dict[oid]
                self.conflict_serial = self.local_var.object_stored[1]
690
                raise NEOStorageConflictError
691 692
            # increase counter so that we know if a node has stored the object or not
            self.local_var.object_stored_counter += 1
693

694 695 696 697
        if self.local_var.object_stored_counter == 0:
            # no storage nodes were available
            raise NEOStorageError('tpc_store failed')
        
698
        # Store object in tmp cache
699
        self.local_var.data_dict[oid] = data
700

701
        return self.local_var.tid
702

Aurel's avatar
Aurel committed
703

704 705
    def tpc_vote(self, transaction):
        """Store current transaction."""
706
        if transaction is not self.local_var.txn:
707
            raise StorageTransactionError(self, transaction)
708 709
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
710
        ext = dumps(transaction._extension)
711
        oid_list = self.local_var.data_dict.keys()
712
        # Store data on each node
713
        pt = self._getPartitionTable()
714
        cell_list = self._getCellListForID(self.local_var.tid, writable=True)
715
        self.local_var.voted_counter = 0
716
        for cell in cell_list:
Aurel's avatar
Aurel committed
717
            logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
718
            conn = self.cp.getConnForCell(cell)
719 720
            if conn is None:
                continue
721

722 723 724
            self.local_var.txn_voted = False
            p = protocol.askStoreTransaction(self.local_var.tid, 
                    user, desc, ext, oid_list)
725 726
            try:
                self._askStorage(conn, p)
727
            except ConnectionClosed:
728
                continue
729

730
            if not self.isTransactionVoted():
731
                raise NEOStorageError('tpc_vote failed')
732 733 734 735 736
            self.local_var.voted_counter += 1

        # check at least one storage node accepted
        if self.local_var.voted_counter == 0:
            raise NEOStorageError('tpc_vote failed')
737

738 739
    def tpc_abort(self, transaction):
        """Abort current transaction."""
740
        if transaction is not self.local_var.txn:
741
            return
Aurel's avatar
Aurel committed
742

743
        cell_set = set()
744 745
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
746
            cell_set |= set(self._getCellListForID(oid, writable=True))
747
        # select nodes where transaction was stored
748
        cell_set |= set(self._getCellListForID(self.local_var.tid, writable=True))
Aurel's avatar
Aurel committed
749

750 751
        # cancel transaction one all those nodes
        for cell in cell_set:
752
            conn = self.cp.getConnForCell(cell)
753 754 755
            if conn is None:
                continue
            try:
756
                conn.notify(protocol.abortTransaction(self.local_var.tid))
757 758
            finally:
                conn.unlock()
759

760
        # Abort the transaction in the primary master node.
761
        conn = self._getMasterConnection()
762 763
        conn.lock()
        try:
764
            conn.notify(protocol.abortTransaction(self.local_var.tid))
765 766
        finally:
            conn.unlock()
767
        self.local_var.clear()
768

769 770
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
771
        if self.local_var.txn is not transaction:
772
            return
773
        self._load_lock_acquire()
774
        try:
775 776
            # Call function given by ZODB
            if f is not None:
777
                f(self.local_var.tid)
778 779

            # Call finish on master
780
            oid_list = self.local_var.data_dict.keys()
781 782
            p = protocol.finishTransaction(oid_list, self.local_var.tid)
            self._askPrimary(p)
783

784
            if not self.isTransactionFinished():
785
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
786

787 788 789
            # Update cache
            self._cache_lock_acquire()
            try:
790 791
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
792
                    # Now serial is same as tid
793
                    self.mq_cache[oid] = self.local_var.tid, data
794 795
            finally:
                self._cache_lock_release()
796
            self.local_var.clear()
797
            return self.local_var.tid
798
        finally:
799
            self._load_lock_release()
800

Aurel's avatar
Aurel committed
801
    def undo(self, transaction_id, txn, wrapper):
802
        if txn is not self.local_var.txn:
803
            raise StorageTransactionError(self, transaction_id)
804

805
        # First get transaction information from a storage node.
806
        cell_list = self._getCellListForID(transaction_id, writable=True)
807 808
        shuffle(cell_list)
        for cell in cell_list:
809
            conn = self.cp.getConnForCell(cell)
810 811
            if conn is None:
                continue
812

813
            self.local_var.txn_info = 0
814 815
            try:
                self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
816
            except ConnectionClosed:
817
                continue
818

819 820 821
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
822
            elif isinstance(self.local_var.txn_info, dict):
823 824 825 826
                break
            else:
                raise NEOStorageError('undo failed')

827
        if self.local_var.txn_info in (-1, 0):
828 829 830
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
831 832 833
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
834 835 836 837 838
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
839
            data, start, end = result
Aurel's avatar
Aurel committed
840 841 842
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
843
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
844
            data_dict[oid] = data
Aurel's avatar
Aurel committed
845

846
        # Third do transaction with old data
847 848
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
849
            data = data_dict[oid]
Aurel's avatar
Aurel committed
850
            try:
851
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
852
            except NEOStorageConflictError, serial:
853 854
                if serial <= self.local_var.tid:
                    new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid,
855
                                                            serial, data)
Aurel's avatar
Aurel committed
856
                    if new_data is not None:
857
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
858
                        continue
859
                raise ConflictError(oid = oid, serials = (self.local_var.tid, serial),
860
                                    data = data)
861
        return self.local_var.tid, oid_list
862

863
    def undoLog(self, first, last, filter=None, block=0):
864 865 866 867
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

868
        # First get a list of transactions from all storage nodes.
869 870
        # Each storage node will return TIDs only for UP_TO_DATE_STATE and
        # FEEDING_STATE cells
871 872
        pt = self._getPartitionTable()
        storage_node_list = pt.getNodeList()
873

874 875
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
876
            conn = self.cp.getConnForNode(storage_node)
877 878
            if conn is None:
                continue
879

880
            try:
881
                conn.ask(protocol.askTIDs(first, last, INVALID_PARTITION))
882 883 884 885 886
            finally:
                conn.unlock()

        # Wait for answers from all storages.
        # FIXME this is a busy loop.
887
        while len(self.local_var.node_tids) != len(storage_node_list):
888 889
            try:
                self._waitMessage(handler=self.storage_handler)
890
            except ConnectionClosed:
891
                continue
892 893

        # Reorder tids
894 895 896 897 898
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
          update(tid_list)
        ordered_tids = list(ordered_tids)
899 900
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
901
        logging.debug("UndoLog, tids %s", ordered_tids)
902 903
        # For each transaction, get info
        undo_info = []
904
        append = undo_info.append
905
        for tid in ordered_tids:
906
            cell_list = self._getCellListForID(tid, readable=True)
907 908
            shuffle(cell_list)
            for cell in cell_list:
909
                conn = self.cp.getConnForCell(cell)
910 911 912 913
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
                        self._askStorage(conn, protocol.askTransactionInformation(tid))
914
                    except ConnectionClosed:
915 916 917
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
918

919
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
920
                # TID not found at all
921 922 923
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
924

925 926 927 928 929
            if filter is None or filter(self.local_var.txn_info):
                self.local_var.txn_info.pop("oids")
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
930 931 932 933
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
            undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1)
934 935
        return undo_info

936
    # FIXME: filter function isn't used 
937
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
938
        # Get history informations for object first
939
        cell_list = self._getCellListForID(oid, readable=True)
940 941 942
        shuffle(cell_list)

        for cell in cell_list:
943
            conn = self.cp.getConnForCell(cell)
944 945
            if conn is None:
                continue
946

947
            self.local_var.history = None
948 949
            try:
                self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
950
            except ConnectionClosed:
951
                continue
952

953 954 955 956 957
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
958 959 960
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
961

962
        if not isinstance(self.local_var.history, tuple):
963 964 965 966 967 968
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
969
        history_list = []
970
        for serial, size in self.local_var.history[1]:
971
            self._getCellListForID(serial, readable=True)
972 973 974
            shuffle(cell_list)

            for cell in cell_list:
975
                conn = self.cp.getConnForCell(cell)
976 977
                if conn is None:
                    continue
978

979 980
                # ask transaction information
                self.local_var.txn_info = None
981 982
                try:
                    self._askStorage(conn, protocol.askTransactionInformation(serial))
983
                except ConnectionClosed:
984
                    continue
985

986 987
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
988
                    continue
989
                if isinstance(self.local_var.txn_info, dict):
990 991 992
                    break

            # create history dict
993 994
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
995
            self.local_var.txn_info['tid'] = serial
996 997 998
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
            history_list.append(self.local_var.txn_info)
999 1000

        return history_list
Aurel's avatar
Aurel committed
1001

1002 1003 1004 1005 1006
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1007 1008 1009 1010 1011
            conn.lock()
            try:
                conn.close()
            finally:
                conn.release()
1012 1013
        # Stop polling thread
        self.poll_thread.stop()
1014
    close = __del__
1015 1016

    def sync(self):
1017
        self._waitMessage()
1018

1019 1020 1021 1022 1023 1024 1025 1026 1027
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1028
    def setTID(self, value):
1029
        self.local_var.tid = value
1030 1031

    def getTID(self):
1032
        return self.local_var.tid
1033 1034 1035 1036 1037

    def getConflictSerial(self):
        return self.conflict_serial

    def setTransactionFinished(self):
1038
        self.local_var.txn_finished = True
1039 1040

    def isTransactionFinished(self):
1041
        return self.local_var.txn_finished
1042 1043

    def setTransactionVoted(self):
1044
        self.local_var.txn_voted = True
1045 1046

    def isTransactionVoted(self):
1047
        return self.local_var.txn_voted
1048