Storage.py 9.86 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from ZODB import BaseStorage, ConflictResolution, POSException
Vincent Pelletier's avatar
Vincent Pelletier committed
19 20
from zope.interface import implements
import ZODB.interfaces
21

22
from neo import setupLog
Vincent Pelletier's avatar
Vincent Pelletier committed
23 24
from neo.util import add64
from neo.protocol import ZERO_TID
25
from neo.client.app import Application
26
from neo.client.exception import NEOStorageNotFoundError
Vincent Pelletier's avatar
Vincent Pelletier committed
27
from neo.client.exception import NEOStorageDoesNotExistError
28

29 30 31 32 33 34 35
def check_read_only(func):
    def wrapped(self, *args, **kw):
        if self._is_read_only:
            raise POSException.ReadOnlyError()
        return func(self, *args, **kw)
    return wrapped

36 37 38 39 40 41 42
class DummyCache(object):
    def __init__(self, app):
        self.app = app

    def clear(self):
        self.app.mq_cache.clear()

Aurel's avatar
Aurel committed
43 44
class Storage(BaseStorage.BaseStorage,
              ConflictResolution.ConflictResolvingStorage):
45
    """Wrapper class for neoclient."""
46

Vincent Pelletier's avatar
Vincent Pelletier committed
47 48
    _snapshot_tid = None

Vincent Pelletier's avatar
Vincent Pelletier committed
49 50
    implements(
        ZODB.interfaces.IStorage,
51 52 53
        # "restore" missing for the moment, but "store" implements this
        # interface.
        # ZODB.interfaces.IStorageRestoreable,
54 55 56 57
        # XXX: imperfect iterator implementation:
        # - start & stop are not handled (raises if either is not None)
        # - transaction isolation is not done
        # ZODB.interfaces.IStorageIteration,
Vincent Pelletier's avatar
Vincent Pelletier committed
58
        ZODB.interfaces.IStorageUndoable,
Vincent Pelletier's avatar
Vincent Pelletier committed
59
        ZODB.interfaces.IExternalGC,
60
        ZODB.interfaces.ReadVerifyingStorage,
Vincent Pelletier's avatar
Vincent Pelletier committed
61
        ZODB.interfaces.IMVCCStorage,
Vincent Pelletier's avatar
Vincent Pelletier committed
62
    )
Aurel's avatar
Aurel committed
63

64
    def __init__(self, master_nodes, name, connector=None, read_only=False,
Vincent Pelletier's avatar
Vincent Pelletier committed
65 66 67 68 69 70 71 72
            compress=None, logfile=None, verbose=False,
            _app=None, _cache=None,
            **kw):
        """
        Do not pass those parameters (used internally):
        _app
        _cache
        """
73 74
        if compress is None:
            compress = True
75
        setupLog('CLIENT', filename=logfile, verbose=verbose)
Vincent Pelletier's avatar
Vincent Pelletier committed
76 77
        BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
        # Warning: _is_read_only is used in BaseStorage, do not rename it.
78
        self._is_read_only = read_only
Vincent Pelletier's avatar
Vincent Pelletier committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
        if _app is None:
            _app = Application(master_nodes, name, connector,
                compress=compress)
            assert _cache is None
            _cache = DummyCache(_app)
        self.app = _app
        assert _cache is not None
        self._cache = _cache
        # Used to clone self (see new_instance & IMVCCStorage definition).
        self._init_args = (master_nodes, name)
        self._init_kw = {
            'connector': connector,
            'read_only': read_only,
            'compress': compress,
            'logfile': logfile,
            'verbose': verbose,
            '_app': _app,
            '_cache': _cache,
        }

    def _getSnapshotTID(self):
        """
        Get the highest TID visible for current transaction.
        First call sets this snapshot by asking master node most recent
        committed TID.
        As a (positive) side-effect, this forces us to handle all pending
        invalidations, so we get a very recent view of the database (which is
        good when multiple databases are used in the same program with some
        amount of referential integrity).
        """
        tid = self._snapshot_tid
        if tid is None:
            tid = self.lastTransaction()
            if tid is ZERO_TID:
                raise NEOStorageDoesNotExistError('No transaction in storage')
            # Increment by one, as we will use this as an excluded upper
            # bound (loadBefore).
            tid = add64(tid, 1)
            self._snapshot_tid = tid
        return tid

    def _load(self, *args, **kw):
        return self.app.load(self._getSnapshotTID(), *args, **kw)
122

123 124 125 126
    def load(self, oid, version=''):
        # XXX: interface deifinition states that version parameter is
        # mandatory, while some ZODB tests do not provide it. For now, make
        # it optional.
Vincent Pelletier's avatar
Vincent Pelletier committed
127
        assert version == '', 'Versions are not supported'
128
        try:
Vincent Pelletier's avatar
Vincent Pelletier committed
129
            return self._load(oid)[:2]
130 131
        except NEOStorageNotFoundError:
            raise POSException.POSKeyError(oid)
132

133
    @check_read_only
134
    def new_oid(self):
Aurel's avatar
Aurel committed
135
        return self.app.new_oid()
136

137
    @check_read_only
138
    def tpc_begin(self, transaction, tid=None, status=' '):
Vincent Pelletier's avatar
Vincent Pelletier committed
139 140 141
        """
        Note: never blocks in NEO.
        """
142
        return self.app.tpc_begin(transaction=transaction, tid=tid,
143
                status=status)
144

145
    @check_read_only
146
    def tpc_vote(self, transaction):
147 148
        return self.app.tpc_vote(transaction=transaction,
            tryToResolveConflict=self.tryToResolveConflict)
149

150
    @check_read_only
151
    def tpc_abort(self, transaction):
Vincent Pelletier's avatar
Vincent Pelletier committed
152
        self.sync()
153
        return self.app.tpc_abort(transaction=transaction)
154

155
    def tpc_finish(self, transaction, f=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
156
        result = self.app.tpc_finish(transaction=transaction,
157
            tryToResolveConflict=self.tryToResolveConflict, f=f)
Vincent Pelletier's avatar
Vincent Pelletier committed
158 159
        self.sync()
        return result
160

161
    @check_read_only
162
    def store(self, oid, serial, data, version, transaction):
Vincent Pelletier's avatar
Vincent Pelletier committed
163
        assert version == '', 'Versions are not supported'
164
        return self.app.store(oid=oid, serial=serial,
Vincent Pelletier's avatar
Vincent Pelletier committed
165
            data=data, version=version, transaction=transaction)
166

167
    @check_read_only
Vincent Pelletier's avatar
Vincent Pelletier committed
168
    def deleteObject(self, oid, serial, transaction):
169 170 171
        self.app.store(oid=oid, serial=serial, data='', version=None,
            transaction=transaction)

172 173
    # mutliple revisions
    def loadSerial(self, oid, serial):
174
        try:
Vincent Pelletier's avatar
Vincent Pelletier committed
175
            return self._load(oid, serial=serial)[0]
176
        except NEOStorageNotFoundError:
177
            raise POSException.POSKeyError(oid)
178 179

    def loadBefore(self, oid, tid):
180
        try:
Vincent Pelletier's avatar
Vincent Pelletier committed
181
            return self._load(oid, tid=tid)
Vincent Pelletier's avatar
Vincent Pelletier committed
182 183
        except NEOStorageDoesNotExistError:
            raise POSException.POSKeyError(oid)
184
        except NEOStorageNotFoundError:
185
            return None
186

187
    def iterator(self, start=None, stop=None):
188 189 190 191 192 193
        # Iterator lives in its own transaction, so get a fresh snapshot.
        snapshot_tid = self.lastTransaction()
        if stop is None:
            stop = snapshot_tid
        else:
            stop = min(snapshot_tid, stop)
194
        return self.app.iterator(start, stop)
195

196
    # undo
197
    @check_read_only
198
    def undo(self, transaction_id, txn):
Vincent Pelletier's avatar
Vincent Pelletier committed
199 200
        return self.app.undo(self._getSnapshotTID(), undone_tid=transaction_id,
            txn=txn, tryToResolveConflict=self.tryToResolveConflict)
201

202

203
    @check_read_only
Vincent Pelletier's avatar
Vincent Pelletier committed
204
    def undoLog(self, first=0, last=-20, filter=None):
Aurel's avatar
Aurel committed
205
        return self.app.undoLog(first, last, filter)
206 207

    def supportsUndo(self):
208 209 210 211
        return True

    def supportsTransactionalUndo(self):
        return True
212

213
    @check_read_only
214
    def abortVersion(self, src, transaction):
215
        return self.app.abortVersion(src, transaction)
216

217
    @check_read_only
218
    def commitVersion(self, src, dest, transaction):
219
        return self.app.commitVersion(src, dest, transaction)
220

Grégory Wisniewski's avatar
Grégory Wisniewski committed
221 222
    def loadEx(self, oid, version):
        try:
Vincent Pelletier's avatar
Vincent Pelletier committed
223
            data, serial, _ = self._load(oid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
224 225
        except NEOStorageNotFoundError:
            raise POSException.POSKeyError(oid)
Vincent Pelletier's avatar
Vincent Pelletier committed
226
        return data, serial, ''
Grégory Wisniewski's avatar
Grégory Wisniewski committed
227

228
    def __len__(self):
229
        return self.app.getStorageSize()
230

Vincent Pelletier's avatar
Vincent Pelletier committed
231
    def registerDB(self, db, limit=None):
232
        self.app.registerDB(db, limit)
Aurel's avatar
Aurel committed
233

Grégory Wisniewski's avatar
Grégory Wisniewski committed
234 235
    def history(self, oid, version=None, size=1, filter=None):
        return self.app.history(oid, version, size, filter)
Aurel's avatar
Aurel committed
236

Vincent Pelletier's avatar
Vincent Pelletier committed
237 238
    def sync(self, force=True):
        self._snapshot_tid = None
Aurel's avatar
Aurel committed
239

240
    def copyTransactionsFrom(self, source, verbose=False):
241 242 243 244 245 246 247 248
        """ Zope compliant API """
        return self.app.importFrom(source, None, None,
                self.tryToResolveConflict)

    def importFrom(self, source, start=None, stop=None):
        """ Allow import only a part of the source storage """
        return self.app.importFrom(source, start, stop,
                self.tryToResolveConflict)
249

250 251
    def restore(self, oid, serial, data, version, prev_txn, transaction):
        raise NotImplementedError
252

253 254
    def pack(self, t, referencesf, gc=False):
        if gc:
255
            neo.logging.warning('Garbage Collection is not available in NEO, '
256 257
                'please use an external tool. Packing without GC.')
        self.app.pack(t)
258 259 260 261 262 263

    def lastSerial(self):
        # seems unused
        raise NotImplementedError

    def lastTransaction(self):
264 265
        # Used in ZODB unit tests
        return self.app.lastTransaction()
266 267 268 269 270 271 272

    def _clear_temp(self):
        raise NotImplementedError

    def set_max_oid(self, possible_new_max_oid):
        # seems used only by FileStorage
        raise NotImplementedError
273 274 275 276 277 278 279

    def cleanup(self):
        # Used in unit tests to remove local database files.
        # We have no such thing, so make this method a no-op.
        pass

    def close(self):
280
        self.app.close()
281

Vincent Pelletier's avatar
Vincent Pelletier committed
282 283 284 285 286
    def getTid(self, oid):
        try:
            return self.app.getLastTID(oid)
        except NEOStorageNotFoundError:
            raise KeyError
287 288 289 290

    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
        self.app.checkCurrentSerialInTransaction(oid, serial, transaction)

Vincent Pelletier's avatar
Vincent Pelletier committed
291 292 293 294 295 296 297 298 299 300 301
    def new_instance(self):
        return Storage(*self._init_args, **self._init_kw)

    def poll_invalidations(self):
        """
        Nothing to do, NEO doesn't need any polling.
        """
        pass

    release = sync