Commit f419f974 authored by Julien Muchembled's avatar Julien Muchembled

storage: define interface for backends and check they implement it

parent c6b80f7b
#
# Copyright (C) 2015 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect
from functools import wraps
def implements(obj, ignore=()):
tobj = type(obj)
ignore = set(ignore)
not_implemented = []
for name in dir(obj):
method = getattr(obj if issubclass(tobj, type) or name in obj.__dict__
else tobj, name)
if inspect.ismethod(method):
try:
method.__abstract__
ignore.remove(name)
except KeyError:
not_implemented.append(name)
except AttributeError:
not_implemented.extend(func.__name__
for func in getattr(method, "__requires__", ())
if not hasattr(obj, func.__name__))
assert not ignore, ignore
assert not not_implemented, not_implemented
return obj
def _set_code(func):
args, varargs, varkw, _ = inspect.getargspec(func)
if varargs:
args.append("*" + varargs)
if varkw:
args.append("**" + varkw)
exec "def %s(%s): raise NotImplementedError\nf = %s" % (
func.__name__, ",".join(args), func.__name__)
func.func_code = f.func_code
def abstract(func):
_set_code(func)
func.__abstract__ = 1
return func
def requires(*args):
for func in args:
_set_code(func)
def decorator(func):
func.__requires__ = args
return func
return decorator
......@@ -17,7 +17,6 @@
LOG_QUERIES = False
from neo.lib.exception import DatabaseFailure
from .manager import DatabaseManager
DATABASE_MANAGER_DICT = {
'Importer': 'importer.ImporterDatabaseManager',
......
......@@ -23,10 +23,13 @@ from ConfigParser import SafeConfigParser
from ZODB.config import storageFromString
from ZODB.POSException import POSKeyError
from . import buildDatabaseManager, DatabaseManager
from . import buildDatabaseManager
from .manager import DatabaseManager
from neo.lib import logging, patch, util
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH, MAX_TID
from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, CellStates, \
MAX_TID, ZERO_HASH, ZERO_OID, ZERO_TID
patch.speedupFileStorageTxnLookup()
......@@ -280,6 +283,9 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
self.db._connect()
implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions getLastTID
getReplicationObjectList getTIDList nonempty""".split())
_uncommitted_data = property(
lambda self: self.db._uncommitted_data,
......@@ -549,3 +555,8 @@ class ImporterDatabaseManager(DatabaseManager):
length, partition)
return r
def getObjectHistory(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistory)
def pack(self, *args, **kw):
raise BackendNotImplemented(self.pack)
......@@ -18,7 +18,8 @@ from collections import defaultdict
from functools import wraps
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_TID, BackendNotImplemented
from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import ZERO_TID
def lazymethod(func):
def getter(self):
......@@ -72,9 +73,9 @@ class DatabaseManager(object):
setattr(self, attr, value)
return value
@abstract
def _parse(self, database):
"""Called during instanciation, to process database parameter."""
pass
def setup(self, reset=0):
"""Set up a database, discarding existing data first if reset is True
......@@ -84,6 +85,11 @@ class DatabaseManager(object):
self._uncommitted_data = defaultdict(int)
self._setup()
@abstract
def erase(self):
""""""
@abstract
def _setup(self):
"""To be overriden by the backend to set up a database
......@@ -93,11 +99,10 @@ class DatabaseManager(object):
where the refcount is increased later, when the object is read-locked.
Keys are data ids and values are number of references.
"""
raise NotImplementedError
@abstract
def nonempty(self, table):
"""Check whether table is empty or return None if it does not exist"""
raise NotImplementedError
def _checkNoUnfinishedTransactions(self, *hint):
if self.nonempty('ttrans') or self.nonempty('tobj'):
......@@ -117,11 +122,11 @@ class DatabaseManager(object):
def commit(self):
pass
@abstract
def getConfiguration(self, key):
"""
Return a configuration value, returns None if not found or not set
"""
raise NotImplementedError
def setConfiguration(self, key, value):
"""
......@@ -130,8 +135,9 @@ class DatabaseManager(object):
self._setConfiguration(key, value)
self.commit()
@abstract
def _setConfiguration(self, key, value):
raise NotImplementedError
""""""
def getUUID(self):
"""
......@@ -233,23 +239,24 @@ class DatabaseManager(object):
except TypeError:
return -1
@abstract
def getPartitionTable(self):
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
raise NotImplementedError
@abstract
def getLastTID(self, max_tid):
"""Return greatest tid in trans table that is <= given 'max_tid'
Required only to import a DB using Importer backend.
max_tid must be in unpacked format.
"""
raise NotImplementedError
def _getLastIDs(self):
raise NotImplementedError
""""""
@requires(_getLastIDs)
def getLastIDs(self):
trans, obj, oid = self._getLastIDs()
if trans:
......@@ -267,8 +274,9 @@ class DatabaseManager(object):
return tid, trans, obj, oid
def _getUnfinishedTIDDict(self):
raise NotImplementedError
""""""
@requires(_getUnfinishedTIDDict)
def getUnfinishedTIDDict(self):
trans, obj = self._getUnfinishedTIDDict()
obj = dict.fromkeys(obj)
......@@ -283,6 +291,7 @@ class DatabaseManager(object):
r = self.getObject(oid)
return r and r[0]
@abstract
def _getNextTID(self, partition, oid, tid):
"""
partition (int)
......@@ -294,7 +303,6 @@ class DatabaseManager(object):
If tid is the last revision of oid, None is returned.
"""
raise NotImplementedError
def _getObject(self, oid, tid=None, before_tid=None):
"""
......@@ -306,8 +314,8 @@ class DatabaseManager(object):
Serial to retrieve is the highest existing one strictly below this
value.
"""
raise NotImplementedError
@requires(_getObject)
def getObject(self, oid, tid=None, before_tid=None):
"""
oid (packed)
......@@ -342,22 +350,23 @@ class DatabaseManager(object):
compression, checksum, data,
None if data_serial is None else util.p64(data_serial))
@abstract
def changePartitionTable(self, ptid, cell_list, reset=False):
"""Change a part of a partition table. The list of cells is
a tuple of tuples, each of which consists of an offset (row ID),
the NID of a storage node, and a cell state. The Partition
Table ID must be stored as well. If reset is True, existing data
is first thrown away."""
raise NotImplementedError
@abstract
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
raise NotImplementedError
@abstract
def dropUnfinishedData(self):
"""Drop any unfinished data from a database."""
raise NotImplementedError
@abstract
def storeTransaction(self, tid, object_list, transaction, temporary = True):
"""Store a transaction temporarily, if temporary is true. Note
that this transaction is not finished yet. The list of objects
......@@ -366,8 +375,8 @@ class DatabaseManager(object):
The transaction is either None or a tuple of the list of OIDs,
user information, a description, extension information and transaction
pack state (True for packed)."""
raise NotImplementedError
@abstract
def _pruneData(self, data_id_list):
"""To be overriden by the backend to delete any unreferenced data
......@@ -376,15 +385,14 @@ class DatabaseManager(object):
- and not referenced by a fully-committed object (storage should have
an index or a refcount of all data ids of all objects)
"""
raise NotImplementedError
@abstract
def storeData(self, checksum, data, compression):
"""To be overriden by the backend to store object raw data
If same data was already stored, the storage only has to check there's
no hash collision.
"""
raise NotImplementedError
def holdData(self, checksum_or_id, *args):
"""Store raw data of temporary object
......@@ -503,29 +511,31 @@ class DatabaseManager(object):
data_tid = p64(data_tid)
return p64(current_tid), data_tid, is_current
@abstract
def lockTransaction(self, tid, ttid):
"""Mark voted transaction 'ttid' as committed with given 'tid'"""
raise NotImplementedError
@abstract
def unlockTransaction(self, tid, ttid):
"""Finalize a transaction by moving data to a finished area."""
raise NotImplementedError
@abstract
def abortTransaction(self, ttid):
raise NotImplementedError
""""""
@abstract
def deleteTransaction(self, tid):
raise NotImplementedError
""""""
@abstract
def deleteObject(self, oid, serial=None):
"""Delete given object. If serial is given, only delete that serial for
given oid."""
raise NotImplementedError
@abstract
def _deleteRange(self, partition, min_tid=None, max_tid=None):
"""Delete all objects and transactions between given min_tid (excluded)
and max_tid (included)"""
raise NotImplementedError
def truncate(self):
tid = self.getTruncateTID()
......@@ -536,40 +546,41 @@ class DatabaseManager(object):
self._setTruncateTID(None)
self.commit()
@abstract
def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information,
a description, and extension information, for a given transaction
ID. If there is no such transaction ID in a database, return None.
If all is true, the transaction must be searched from a temporary
area as well."""
raise NotImplementedError
@abstract
def getObjectHistory(self, oid, offset, length):
"""Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. Result starts
with latest serial, and the list must be sorted in descending order.
If there is no such object ID in a database, return None."""
raise BackendNotImplemented(self.getObjectHistory)
@abstract
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
"""Return a dict of length oids grouped by serial at (or above)
min_tid and min_oid and below max_tid, for given partition,
sorted in ascending order."""
raise NotImplementedError
@abstract
def getTIDList(self, offset, length, partition_list):
"""Return a list of TIDs in ascending order from an offset,
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
raise BackendNotImplemented(self.getTIDList)
@abstract
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
"""Return a list of TIDs in ascending order from an initial tid value,
at most the specified length up to max_tid. The partition number is
passed to filter out non-applicable TIDs."""
raise NotImplementedError
@abstract
def pack(self, tid, updateObjectDataForPack):
"""Prune all non-current object revisions at given tid.
updateObjectDataForPack is a function called for each deleted object
......@@ -585,8 +596,8 @@ class DatabaseManager(object):
Takes no parameter, returns a 3-tuple: compression, data_id,
value
"""
raise NotImplementedError
@abstract
def checkTIDRange(self, partition, length, min_tid, max_tid):
"""
Generate a diggest from transaction list.
......@@ -602,8 +613,8 @@ class DatabaseManager(object):
- biggest TID found (ie, TID of last record read)
ZERO_TID if not record found
"""
raise NotImplementedError
@abstract
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
"""
Generate a diggest from object list.
......@@ -626,5 +637,3 @@ class DatabaseManager(object):
record read)
ZERO_TID if no record found
"""
raise NotImplementedError
......@@ -28,10 +28,11 @@ import string
import struct
import time
from . import DatabaseManager, LOG_QUERIES
from .manager import CreationUndone, splitOIDField
from . import LOG_QUERIES
from .manager import CreationUndone, DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
......@@ -40,6 +41,7 @@ def getPrintableQuery(query, max=70):
else '\\x%02x' % ord(c) for c in query)
@implements
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
......
......@@ -20,10 +20,11 @@ from hashlib import sha1
import string
import traceback
from . import DatabaseManager, LOG_QUERIES
from .manager import CreationUndone, splitOIDField
from . import LOG_QUERIES
from .manager import CreationUndone, DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
def unique_constraint_message(table, *columns):
......@@ -57,6 +58,7 @@ def retry_if_locked(f, *args):
raise
@implements
class SQLiteDatabaseManager(DatabaseManager):
"""This class manages a database on SQLite.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment