pax_global_header 0000666 0000000 0000000 00000000064 13313233730 0014510 g ustar 00root root 0000000 0000000 52 comment=97af23cc3740cc4b2eee09125c9ce95abb6fbe2c
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/ 0000775 0000000 0000000 00000000000 13313233730 0022753 5 ustar 00root root 0000000 0000000 neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/ 0000775 0000000 0000000 00000000000 13313233730 0023534 5 ustar 00root root 0000000 0000000 neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/ 0000775 0000000 0000000 00000000000 13313233730 0025200 5 ustar 00root root 0000000 0000000 neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/__init__.py 0000664 0000000 0000000 00000000000 13313233730 0027277 0 ustar 00root root 0000000 0000000 neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/app.py 0000664 0000000 0000000 00000024576 13313233730 0026350 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
from collections import deque
from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.connection import ListeningConnection
from neo.lib.exception import StoppedOperation, PrimaryFailure
from neo.lib.pt import PartitionTable
from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
from .checker import Checker
from .database import buildDatabaseManager
from .handlers import identification, initialization, master
from .replicator import Replicator
from .transactions import TransactionManager
from neo.lib.debug import register as registerLiveDebugger
class Application(BaseApplication):
"""The storage node application."""
checker = replicator = tm = None
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
# set the cluster name
self.name = config.getCluster()
self.dm = buildDatabaseManager(config.getAdapter(),
(config.getDatabase(), config.getEngine(), config.getWait()),
)
self.disable_drop_partitions = config.getDisableDropPartitions()
# load master nodes
for master_address in config.getMasters():
self.nm.createMaster(address=master_address)
# set the bind address
self.server = config.getBind()
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.listening_conn = None
self.master_conn = None
self.master_node = None
# operation related data
self.operational = False
self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None:
self.uuid = config.getUUID()
registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.dm.close()
super(Application, self).close()
def _poll(self):
self.em.poll(1)
def log(self):
self.em.log()
self.nm.log()
if self.tm:
self.tm.log()
if self.pt is not None:
self.pt.log()
def loadConfiguration(self):
"""Load persistent configuration data from the database.
If data is not present, generate it."""
dm = self.dm
# check cluster name
name = dm.getName()
if name is None:
dm.setName(self.name)
elif name != self.name:
raise RuntimeError('name %r does not match with the database: %r'
% (self.name, name))
# load configuration
self.uuid = dm.getUUID()
num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas()
ptid = dm.getPTID()
# check partition table configuration
if num_partitions is not None and num_replicas is not None:
if num_partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
# create a partition table
self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:')
logging.info('UUID : %s', uuid_str(self.uuid))
logging.info('PTID : %s', dump(ptid))
logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions)
logging.info('Replicas : %s', num_replicas)
def loadPartitionTable(self):
"""Load a partition table from the database."""
self.pt.clear()
ptid = self.dm.getPTID()
if ptid is None:
return
cell_list = []
for offset, uuid, state in self.dm.getPartitionTable():
# register unknown nodes
if self.nm.getByUUID(uuid) is None:
self.nm.createStorage(uuid=uuid)
cell_list.append((offset, uuid, CellStates[state]))
self.pt.update(ptid, cell_list, self.nm)
def run(self):
try:
self._run()
except Exception:
logging.exception('Pre-mortem data:')
self.log()
logging.flush()
raise
def _run(self):
"""Make sure that the status is sane and start a loop."""
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
# Make a listening port
handler = identification.IdentificationHandler(self)
self.listening_conn = ListeningConnection(self, handler, self.server)
self.server = self.listening_conn.getAddress()
# Connect to a primary master node, verify data, and
# start the operation. This cycle will be executed permanently,
# until the user explicitly requests a shutdown.
self.operational = False
while True:
self.cluster_state = None
if self.master_node is None:
# look for the primary master
self.connectToPrimary()
self.checker = Checker(self)
self.replicator = Replicator(self)
self.tm = TransactionManager(self)
try:
self.initialize()
self.doOperation()
raise RuntimeError, 'should not reach here'
except StoppedOperation, msg:
logging.error('operation stopped: %s', msg)
except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg)
finally:
self.operational = False
# When not ready, we reject any incoming connection so for
# consistency, we also close any connection except that to the
# master. This includes connections to other storage nodes and any
# replication is aborted, whether we are feeding or out-of-date.
for conn in self.em.getConnectionList():
if conn not in (self.listening_conn, self.master_conn):
conn.close()
del self.checker, self.replicator, self.tm
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage."""
pt = self.pt
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
uuid = self.uuid
logging.info('I am %s', uuid_str(uuid))
self.dm.setUUID(uuid)
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
if pt is not None:
self.loadPartitionTable()
if num_partitions != pt.getPartitions():
raise RuntimeError('the number of partitions is inconsistent')
if pt is None or pt.getReplicas() != num_replicas:
# changing number of replicas is not an issue
self.dm.setNumPartitions(num_partitions)
self.dm.setNumReplicas(num_replicas)
self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable()
def initialize(self):
logging.debug('initializing...')
_poll = self._poll
self.master_conn.setHandler(initialization.InitializationHandler(self))
while not self.operational:
_poll()
self.master_conn.send(Packets.NotifyReady())
def doOperation(self):
"""Handle everything, including replications and transactions."""
logging.info('doing operation')
poll = self._poll
_poll = self.em._poll
isIdle = self.em.isIdle
self.master_conn.setHandler(master.MasterOperationHandler(self))
self.replicator.populate()
# Forget all unfinished data.
self.dm.dropUnfinishedData()
self.task_queue = task_queue = deque()
try:
self.dm.doOperation(self)
while True:
while task_queue:
try:
while isIdle():
next(task_queue[-1]) or task_queue.rotate()
_poll(0)
break
except StopIteration:
task_queue.pop()
poll()
finally:
del self.task_queue
def changeClusterState(self, state):
self.cluster_state = state
if state == ClusterStates.STOPPING_BACKUP:
self.replicator.stop()
def newTask(self, iterator):
self.task_queue.appendleft(iterator)
def closeClient(self, connection):
if connection is not self.replicator.getCurrentConnection() and \
connection not in self.checker.conn_dict:
connection.closeClient()
def shutdown(self, erase=False):
"""Close all connections and exit"""
for c in self.em.getConnectionList():
try:
c.close()
except PrimaryFailure:
pass
# clear database to avoid polluting the cluster at restart
if erase:
self.dm.erase()
logging.info("Application has been asked to shut down")
sys.exit()
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/checker.py 0000664 0000000 0000000 00000021512 13313233730 0027157 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2012-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from collections import deque
from neo.lib import logging
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.protocol import NodeTypes, Packets, ZERO_OID
from neo.lib.util import add64, dump
from .handlers.storage import StorageOperationHandler
# TODO: Use a dynamic value such that each chunk takes a few seconds to compute,
# because a too small value wastes network bandwidth. However, a too big
# one prevents the storage from replying quickly to other requests, so
# checkRange() must also be changed to process a chunk in several times,
# with a total time that must not cause timeouts.
CHECK_COUNT = 40000
class Checker(object):
def __init__(self, app):
self.app = app
self.queue = deque()
self.conn_dict = {}
def __call__(self, partition, source, min_tid, max_tid):
self.queue.append((partition, source, min_tid, max_tid))
if not self.conn_dict:
self._nextPartition()
def _nextPartition(self):
app = self.app
def connect(node, uuid=app.uuid, name=app.name):
if node.getUUID() == app.uuid:
return
if node.isConnected(connecting=True):
conn = node.getConnection()
conn.asClient()
else:
conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, (), app.id_timestamp))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
try:
self.conn_dict.clear()
while True:
try:
partition, (name, source), min_tid, max_tid = \
self.queue.popleft()
except IndexError:
return
cell = app.pt.getCell(partition, app.uuid)
if cell is None or cell.isOutOfDate():
msg = "discarded or out-of-date"
else:
try:
for cell in app.pt.getCellList(partition):
# XXX: Ignore corrupted cells for the moment
# because we're still unable to fix them
# (see also AdministrationHandler of master)
if cell.isReadable(): #if not cell.isOutOfDate():
connect(cell.getNode())
if source:
node = app.nm.getByAddress(source)
if name:
source = app.nm.createStorage(address=source) \
if node is None else node
connect(source, None, name)
elif (node.getUUID() == app.uuid or
node.isConnected(connecting=True) and
node.getConnection() in self.conn_dict):
source = node
else:
msg = "unavailable source"
if self.conn_dict:
break
msg = "no replica"
except ConnectionClosed:
msg = "connection closed"
finally:
conn_set.update(self.conn_dict)
self.conn_dict.clear()
logging.error("Failed to start checking partition %u (%s)",
partition, msg)
conn_set.difference_update(self.conn_dict)
finally:
for conn in conn_set:
app.closeClient(conn)
logging.debug("start checking partition %u from %s to %s",
partition, dump(min_tid), dump(max_tid))
self.min_tid = self.next_tid = min_tid
self.max_tid = max_tid
self.next_oid = None
self.partition = partition
self.source = source
def start():
if app.tm.isLockedTid(max_tid):
app.tm.read_queue.queueEvent(start)
return
args = partition, CHECK_COUNT, min_tid, max_tid
p = Packets.AskCheckTIDRange(*args)
for conn, identified in self.conn_dict.items():
self.conn_dict[conn] = conn.ask(p) if identified else None
self.conn_dict[None] = app.dm.checkTIDRange(*args)
start()
def connected(self, node):
conn = node.getConnection()
if self.conn_dict.get(conn, self) is None:
self.conn_dict[conn] = conn.ask(Packets.AskCheckTIDRange(
self.partition, CHECK_COUNT, self.next_tid, self.max_tid))
def connectionLost(self, conn):
try:
del self.conn_dict[conn]
except KeyError:
return
if self.source is not None and self.source.getConnection() is conn:
del self.source
elif len(self.conn_dict) > 1:
logging.warning("node lost but keep up checking partition %u",
self.partition)
return
logging.warning("check of partition %u aborted", self.partition)
self._nextPartition()
def _nextRange(self):
if self.next_oid:
args = self.partition, CHECK_COUNT, self.next_tid, self.max_tid, \
self.next_oid
p = Packets.AskCheckSerialRange(*args)
check = self.app.dm.checkSerialRange
else:
args = self.partition, CHECK_COUNT, self.next_tid, self.max_tid
p = Packets.AskCheckTIDRange(*args)
check = self.app.dm.checkTIDRange
for conn in self.conn_dict.keys():
self.conn_dict[conn] = check(*args) if conn is None else conn.ask(p)
def checkRange(self, conn, *args):
if self.conn_dict.get(conn, self) != conn.getPeerId():
# Ignore answers to old requests,
# because we did nothing to cancel them.
logging.info("ignored AnswerCheck*Range%r", args)
return
self.conn_dict[conn] = args
answer_set = set(self.conn_dict.itervalues())
if len(answer_set) > 1:
for answer in answer_set:
if type(answer) is not tuple:
return
# TODO: Automatically tell corrupted cells to fix their data
# if we know a good source.
# For the moment, tell master to put them in CORRUPTED state
# and keep up checking if useful.
uuid = self.app.uuid
args = None if self.source is None else self.conn_dict[
None if self.source.getUUID() == uuid
else self.source.getConnection()]
uuid_list = []
for conn, answer in self.conn_dict.items():
if answer != args:
del self.conn_dict[conn]
if conn is None:
uuid_list.append(uuid)
else:
uuid_list.append(conn.getUUID())
self.app.closeClient(conn)
p = Packets.NotifyPartitionCorrupted(self.partition, uuid_list)
self.app.master_conn.send(p)
if len(self.conn_dict) <= 1:
logging.warning("check of partition %u aborted", self.partition)
self.queue.clear()
self._nextPartition()
return
try:
count, _, max_tid = args
except ValueError: # AnswerCheckSerialRange
count, _, self.next_tid, _, max_oid = args
if count < CHECK_COUNT:
logging.debug("partition %u checked from %s to %s",
self.partition, dump(self.min_tid), dump(self.max_tid))
self._nextPartition()
return
self.next_oid = add64(max_oid, 1)
else: # AnswerCheckTIDRange
if count < CHECK_COUNT:
self.next_tid = self.min_tid
self.next_oid = ZERO_OID
else:
self.next_tid = add64(max_tid, 1)
self._nextRange()
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/database/ 0000775 0000000 0000000 00000000000 13313233730 0026744 5 ustar 00root root 0000000 0000000 neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/database/__init__.py 0000664 0000000 0000000 00000002377 13313233730 0031066 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
LOG_QUERIES = False
DATABASE_MANAGER_DICT = {
'Importer': 'importer.ImporterDatabaseManager',
'MySQL': 'mysqldb.MySQLDatabaseManager',
'SQLite': 'sqlite.SQLiteDatabaseManager',
}
def getAdapterKlass(name):
try:
module, name = DATABASE_MANAGER_DICT[name or 'MySQL'].split('.')
except KeyError:
raise DatabaseFailure('Cannot find a database adapter <%s>' % name)
return getattr(__import__(module, globals(), level=1), name)
def buildDatabaseManager(name, args=(), kw={}):
return getAdapterKlass(name)(*args, **kw)
class DatabaseFailure(Exception):
pass
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/database/importer.py 0000664 0000000 0000000 00000070635 13313233730 0031172 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2014-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
import cPickle, pickle, sys, time
from bisect import bisect, insort
from collections import deque
from cStringIO import StringIO
from ConfigParser import SafeConfigParser
from ZConfig import loadConfigFile
from ZODB import BaseStorage
from ZODB.config import getStorageSchema, storageFromString
from ZODB.POSException import POSKeyError
try:
from ZODB._compat import dumps, loads, _protocol
except ImportError:
from cPickle import dumps, loads
_protocol = 1
from ZODB.FileStorage import FileStorage
from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager
from neo.lib import compress, logging, patch, util
from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, MAX_TID
patch.speedupFileStorageTxnLookup()
FORK = sys.platform != 'win32'
def transactionAsTuple(txn):
ext = txn.extension
return (txn.user, txn.description,
dumps(ext, _protocol) if ext else '',
txn.status == 'p', txn.tid)
class Reference(object):
__slots__ = "value",
def __init__(self, value):
self.value = value
class Repickler(pickle.Unpickler):
def __init__(self, persistent_map):
self._f = StringIO()
# Use python implementation for unpickling because loading can not
# be customized enough with cPickle.
pickle.Unpickler.__init__(self, self._f)
# For pickling, it is possible to use the fastest implementation,
# which also generates fewer useless PUT opcodes.
self._p = cPickle.Pickler(self._f, 1)
self.memo = self._p.memo # just a tiny optimization
def persistent_id(obj):
if isinstance(obj, Reference):
r = obj.value
del obj.value # minimize refcnt like for deque+popleft
return r
self._p.inst_persistent_id = persistent_id
def persistent_load(obj):
new_obj = persistent_map(obj)
if new_obj is not obj:
self._changed = True
return Reference(new_obj)
self.persistent_load = persistent_load
def _save(self, data):
self._p.dump(data.popleft())
# remove STOP (no need to truncate since it will always be overridden)
self._f.seek(-1, 1)
def __call__(self, data):
f = self._f
f.truncate(0)
f.write(data)
f.reset()
self._changed = False
try:
classmeta = self.load()
state = self.load()
finally:
self.memo.clear()
if self._changed:
f.truncate(0)
try:
self._p.dump(classmeta).dump(state)
finally:
self.memo.clear()
return f.getvalue()
return data
dispatch = pickle.Unpickler.dispatch.copy()
class _noload(object):
state = None
def __new__(cls, dump):
def load(*args):
self = object.__new__(cls)
self.dump = dump
# We use deque+popleft everywhere to minimize the number of
# references at the moment cPickle considers memoizing an
# object. This reduces the number of useless PUT opcodes and
# usually produces smaller pickles than ZODB. Without this,
# they would, on the contrary, increase in size.
# We could also use optimize from pickletools module.
self.args = deque(args)
self._list = deque()
self.append = self._list.append
self.extend = self._list.extend
self._dict = deque()
return self
return load
def __setitem__(self, *args):
self._dict.append(args)
def dict(self):
while self._dict:
yield self._dict.popleft()
def list(self, pos):
pt = self.args.popleft()
f = pt._f
f.seek(pos + 3) # NONE + EMPTY_TUPLE + REDUCE
put = f.read() # preserve memo if any
f.truncate(pos)
f.write(self.dump(pt, self.args) + put)
while self._list:
yield self._list.popleft()
def __reduce__(self):
return None, (), self.state, \
self.list(self.args[0]._f.tell()), self.dict()
@_noload
def _obj(self, args):
self._f.write(pickle.MARK)
while args:
self._save(args)
return pickle.OBJ
def _instantiate(self, klass, k):
args = self.stack[k+1:]
self.stack[k:] = self._obj(klass, *args),
del dispatch[pickle.NEWOBJ] # ZODB < 5 has never used protocol 2
@_noload
def find_class(self, args):
module, name = args
return pickle.GLOBAL + module + '\n' + name + '\n'
@_noload
def _reduce(self, args):
self._save(args)
self._save(args)
return pickle.REDUCE
def load_reduce(self):
stack = self.stack
args = stack.pop()
stack[-1] = self._reduce(stack[-1], args)
dispatch[pickle.REDUCE] = load_reduce
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
assert inst.state is None
inst.state = state
dispatch[pickle.BUILD] = load_build
class ZODB(object):
def __init__(self, storage, oid=0, **kw):
self.oid = int(oid)
self.mountpoints = {k: int(v) for k, v in kw.iteritems()}
self.connect(storage)
self.ltid = util.u64(self.lastTransaction())
if not self.ltid:
raise DatabaseFailure("Can not import empty storage: %s" % storage)
self.mapping = {}
def __getstate__(self):
state = self.__dict__.copy()
del state["_connect"], state["data_tid"], state["storage"]
return state
def connect(self, storage):
self.data_tid = {}
config, _ = loadConfigFile(getStorageSchema(), StringIO(storage))
section = config.storage
def _connect():
self.storage = section.open()
self._connect = _connect
config = section.config
if 'read_only' in config.getSectionAttributes():
has_next_oid = config.read_only = hasattr(self, 'next_oid')
if not has_next_oid:
import gc
# This will reopen read-only as soon as we know the last oid.
def new_oid():
del self.new_oid
new_oid = self.storage.new_oid()
self.storage.close()
# A FileStorage index can be huge, and close() does not
# delete it. Stop reference it before loading it again,
# to avoid having it twice in memory.
del self.storage
gc.collect() # to be sure (maybe only required for PyPy,
# if one day we support it)
config.read_only = True
self._connect()
return new_oid
self.new_oid = new_oid
self._connect()
def setup(self, zodb_dict, shift_oid=0):
self.shift_oid = shift_oid
self.next_oid = util.u64(self.new_oid())
shift_oid += self.next_oid
for mp, oid in self.mountpoints.iteritems():
mp = zodb_dict[mp]
new_oid = mp.oid
try:
new_oid += mp.shift_oid
except AttributeError:
new_oid += shift_oid
shift_oid = mp.setup(zodb_dict, shift_oid)
self.mapping[oid] = new_oid
del self.mountpoints
return shift_oid
def repickle(self, data):
if not (self.shift_oid or self.mapping):
self.repickle = lambda x: x
return data
u64 = util.u64
p64 = util.p64
def map_oid(obj):
if isinstance(obj, tuple) and len(obj) == 2:
oid = u64(obj[0])
# If this oid pointed to a mount point, drop 2nd item because
# it's probably different than the real class of the new oid.
elif isinstance(obj, bytes):
oid = u64(obj)
else:
raise NotImplementedError(
"Unsupported external reference: %r" % obj)
try:
return p64(self.mapping[oid])
except KeyError:
if not self.shift_oid:
return obj # common case for root db
oid = p64(oid + self.shift_oid)
return oid if isinstance(obj, bytes) else (oid, obj[1])
self.repickle = Repickler(map_oid)
return self.repickle(data)
def __getattr__(self, attr):
if attr == '__setstate__':
return object.__getattribute__(self, attr)
return getattr(self.storage, attr)
def getDataTid(self, oid, tid):
try:
return self.data_tid[tid].get(oid)
except KeyError:
assert tid not in self.data_tid, (oid, tid)
p_tid = util.p64(tid)
txn = next(self.storage.iterator(p_tid))
if txn.tid != p_tid:
raise
u64 = util.u64
txn = self.data_tid[tid] = {
u64(x.oid): x.data_txn
for x in txn if x.data_txn}
return txn.get(oid)
class ZODBIterator(object):
def __new__(cls, zodb_list, *args):
def _init(zodb):
self = object.__new__(cls)
iterator = zodb.iterator(*args)
def _next():
self.transaction = next(iterator)
self.zodb = zodb
self.next = _next
return self
def init(zodb):
# FileStorage is fork-safe and in case we don't start iteration
# from the beginning, we want the tid index built at most once
# (by speedupFileStorageTxnLookup).
if FORK and not isinstance(zodb.storage, FileStorage):
def init():
zodb._connect()
return _init(zodb)
return init
return _init(zodb)
def result(zodb_list):
for self in zodb_list:
if callable(self):
self = self()
try:
self.next()
yield self
except StopIteration:
pass
return result(map(init, zodb_list))
tid = property(lambda self: self.transaction.tid)
def __lt__(self, other):
return self.tid < other.tid or self.tid == other.tid \
and self.zodb.shift_oid < other.zodb.shift_oid
is_true = ('false', 'true').index
class ImporterDatabaseManager(DatabaseManager):
"""Proxy that transparently imports data from a ZODB storage
"""
_writeback = None
_last_commit = 0
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions _getLastTID
getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition)
_getReadablePartition = property(lambda self: self.db._getReadablePartition)
_uncommitted_data = property(lambda self: self.db._uncommitted_data)
def _parse(self, database):
config = SafeConfigParser()
config.read(os.path.expanduser(database))
sections = config.sections()
# XXX: defaults copy & pasted from elsewhere - refactoring needed
main = self._conf = {'adapter': 'MySQL', 'wait': 0}
main.update(config.items(sections.pop(0)))
self.zodb = [(x, dict(config.items(x))) for x in sections]
x = main.get('compress', 'true')
try:
self.compress = bool(is_true(x))
except ValueError:
self.compress = compress.parseOption(x)
if is_true(main.get('writeback', 'false')):
if len(self.zodb) > 1:
raise Exception(
"Can not forward new transactions to splitted DB.")
self._writeback = self.zodb[0][1]['storage']
def _connect(self):
conf = self._conf
db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable _iterAssignedCells
updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit
_getDevPath dropPartitionsTemporary
""".split():
setattr(self, x, getattr(db, x))
if self._writeback:
self._writeback = WriteBack(db, self._writeback)
db_commit = db.commit
def commit():
db_commit()
self._last_commit = time.time()
if self._writeback:
self._writeback.committed()
self.commit = db.commit = commit
def _updateReadable(self):
raise AssertionError
def changePartitionTable(self, *args, **kw):
self.db.changePartitionTable(*args, **kw)
if self._writeback:
self._writeback.changed()
def unlockTransaction(self, *args):
self.db.unlockTransaction(*args)
if self._writeback:
self._writeback.changed()
def close(self):
if self._writeback:
self._writeback.close()
self.db.close()
if isinstance(self.zodb, list): # _setup called
for zodb in self.zodb:
zodb.close()
def setup(self, reset=False, dedup=False):
self.db.setup(reset, dedup)
zodb_state = self.getConfiguration("zodb")
if zodb_state:
logging.warning("Ignoring configuration file for oid mapping."
" Reloading it from NEO storage.")
zodb = cPickle.loads(zodb_state)
for k, v in self.zodb:
zodb[k].connect(v["storage"])
else:
zodb = {k: ZODB(**v) for k, v in self.zodb}
x, = (x for x in zodb.itervalues() if not x.oid)
x.setup(zodb)
self.setConfiguration("zodb", cPickle.dumps(zodb))
self.zodb_index, self.zodb = zip(*sorted(
(x.shift_oid, x) for x in zodb.itervalues()))
self.zodb_ltid = max(x.ltid for x in self.zodb)
zodb = self.zodb[-1]
self.zodb_loid = zodb.shift_oid + zodb.next_oid - 1
self.zodb_tid = self.db.getLastTID(self.zodb_ltid) or 0
if callable(self._import):
self._import = self._import()
def doOperation(self, app):
if self._import:
app.newTask(self._import)
def _import(self):
p64 = util.p64
u64 = util.u64
tid = p64(self.zodb_tid + 1) if self.zodb_tid else None
zodb_list = ZODBIterator(self.zodb, tid, p64(self.zodb_ltid))
if FORK:
from multiprocessing import Process
from ..shared_queue import Queue
queue = Queue(1<<24)
process = self._import_process = Process(
target=lambda zodb_list: queue(self._iter_zodb(zodb_list)),
args=(zodb_list,))
process.daemon = True
process.start()
else:
queue = self._iter_zodb(zodb_list)
process = None
del zodb_list
object_list = []
data_id_list = []
for txn in queue:
if txn is None:
break
if len(txn) == 3:
oid, data_id, data_tid = txn
if data_id is not None:
checksum, data, compression = data_id
data_id = self.holdData(checksum, oid, data, compression)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
# Give the main loop the opportunity to process requests
# from other nodes. In particular, clients may commit. If the
# storage node exits after such commit, and before we actually
# update 'obj' with 'object_list', some rows in 'data' may be
# unreferenced. This is not a problem because the leak is
# solved when resuming the migration.
# XXX: The leak was solved by the deduplication,
# but it was disabled by default.
else:
tid = txn[-1]
self.storeTransaction(tid, object_list,
((x[0] for x in object_list),) + txn,
False)
self.releaseData(data_id_list)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn[0], txn[1], len(object_list))
del object_list[:], data_id_list[:]
if self._last_commit + 1 < time.time():
self.commit()
self.zodb_tid = u64(tid)
yield
if process:
process.join()
self.commit()
logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.")
self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList
""".split():
setattr(self, x, getattr(self.db, x))
def _iter_zodb(self, zodb_list):
util.setproctitle('neostorage: import')
p64 = util.p64
u64 = util.u64
zodb_list = list(zodb_list)
if zodb_list:
tid = None
_compress = compress.getCompress(self.compress)
while 1:
zodb_list.sort()
z = zodb_list[0]
# Merge transactions with same tid. Only
# user/desc/ext from first ZODB are kept.
if tid != z.tid:
if tid:
yield txn
txn = transactionAsTuple(z.transaction)
tid = txn[-1]
zodb = z.zodb
for r in z.transaction:
oid = p64(u64(r.oid) + zodb.shift_oid)
data_tid = r.data_txn
if data_tid or r.data is None:
data = None
else:
_, compression, data = _compress(zodb.repickle(r.data))
data = util.makeChecksum(data), data, compression
yield oid, data, data_tid
try:
z.next()
except StopIteration:
del zodb_list[0]
if not zodb_list:
break
yield txn
yield
def inZodb(self, oid, tid=None, before_tid=None):
return oid <= self.zodb_loid and (
self.zodb_tid < before_tid if before_tid else
tid is None or self.zodb_tid < tid <= self.zodb_ltid)
def zodbFromOid(self, oid):
zodb = self.zodb[bisect(self.zodb_index, oid) - 1]
return zodb, oid - zodb.shift_oid
def getLastIDs(self):
tid, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)),
max(oid, util.p64(self.zodb_loid)))
def getObject(self, oid, tid=None, before_tid=None):
u64 = util.u64
u_oid = u64(oid)
u_tid = tid and u64(tid)
u_before_tid = before_tid and u64(before_tid)
db = self.db
if self.zodb_tid < (u_before_tid - 1 if before_tid else
u_tid or 0) <= self.zodb_ltid:
o = None
else:
o = db.getObject(oid, tid, before_tid)
if o and self.zodb_ltid < u64(o[0]) or \
not self.inZodb(u_oid, u_tid, u_before_tid):
return o
p64 = util.p64
zodb, z_oid = self.zodbFromOid(u_oid)
try:
value, serial, next_serial = zodb.loadBefore(p64(z_oid),
before_tid or (util.p64(u_tid + 1) if tid else MAX_TID))
except TypeError: # loadBefore returned None
return False
except POSKeyError:
assert not o, o
return o
if serial != tid:
if tid:
return False
u_tid = u64(serial)
if u_tid <= self.zodb_tid and o:
return o
if value:
value = zodb.repickle(value)
checksum = util.makeChecksum(value)
else:
# CAVEAT: Although we think loadBefore should not return an empty
# value for a deleted object (BBB: fixed in ZODB4),
# there's no need to distinguish this case in the above
# except clause because it would be crazy to import a
# NEO DB using this backend.
checksum = None
if not next_serial:
next_serial = db._getNextTID(db._getPartition(u_oid), u_oid, u_tid)
if next_serial:
next_serial = p64(next_serial)
return (serial, next_serial,
0, checksum, value, zodb.getDataTid(z_oid, u_tid))
def getTransaction(self, tid, all=False):
u64 = util.u64
if self.zodb_tid < u64(tid) <= self.zodb_ltid:
for zodb in self.zodb:
for txn in zodb.iterator(tid, tid):
p64 = util.p64
shift_oid = zodb.shift_oid
return ([p64(u64(x.oid) + shift_oid) for x in txn],
) + transactionAsTuple(txn)
else:
return self.db.getTransaction(tid, all)
def getFinalTID(self, ttid):
if util.u64(ttid) <= self.zodb_ltid and self._import:
raise NotImplementedError
return self.db.getFinalTID(ttid)
def _deleteRange(self, partition, min_tid=None, max_tid=None):
# Even if everything is imported, we can't truncate below
# because it would import again if we restart with this backend.
if min_tid < self.zodb_ltid:
raise NotImplementedError
self.db._deleteRange(partition, min_tid, max_tid)
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
# This method is not tested and it is anyway
# useless without getReplicationObjectList.
raise BackendNotImplemented(self.getReplicationTIDList)
p64 = util.p64
tid = p64(self.zodb_tid)
if min_tid <= tid:
r = self.db.getReplicationTIDList(min_tid, min(max_tid, tid),
length, partition)
if max_tid <= tid:
return r
length -= len(r)
min_tid = p64(self.zodb_tid + 1)
else:
r = []
if length:
tid = p64(self.zodb_ltid)
if min_tid <= tid:
u64 = util.u64
def next_tid(i):
for txn in i:
tid = u64(txn.tid)
if self._getPartition(tid) == partition:
insort(z, (-tid, i))
break
z = []
for zodb in self.zodb:
next_tid(zodb.iterator(min_tid, min(max_tid, tid)))
while z:
t, i = z.pop()
r.append(p64(-t))
length -= 1
if not length:
return r
next_tid(i)
if tid < max_tid:
r += self.db.getReplicationTIDList(max(min_tid, tid), max_tid,
length, partition)
return r
def getObjectHistory(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistory)
def pack(self, *args, **kw):
raise BackendNotImplemented(self.pack)
class WriteBack(object):
_changed = False
_process = None
def __init__(self, db, storage):
self._db = db
self._storage = storage
def close(self):
if self._process:
self._stop.set()
self._event.set()
self._process.join()
def changed(self):
self._changed = True
def committed(self):
if self._changed:
self._changed = False
if self._process:
self._event.set()
else:
if FORK:
from multiprocessing import Process, Event
else:
from threading import Thread as Process, Event
self._event = Event()
self._idle = Event()
self._stop = Event()
self._np = self._db.getNumPartitions()
self._db = cPickle.dumps(self._db, 2)
self._process = Process(target=self._run)
self._process.daemon = True
self._process.start()
@property
def wait(self):
# For unit tests.
return self._idle.wait
def _run(self):
util.setproctitle('neostorage: write back')
self._db = cPickle.loads(self._db)
try:
@self._db.autoReconnect
def _():
# Unfortunately, copyTransactionsFrom does not abort in case
# of failure, so we have to reopen.
zodb = storageFromString(self._storage)
try:
self.min_tid = util.add64(zodb.lastTransaction(), 1)
zodb.copyTransactionsFrom(self)
finally:
zodb.close()
finally:
self._idle.set()
self._db.close()
def iterator(self):
db = self._db
np = self._np
chunk_size = max(2, 1000 // np)
offset_list = xrange(np)
while 1:
with db:
# Check the partition table at the beginning of every
# transaction. Once the import is finished and at least one
# cell is replicated, it is possible that some of this node
# get outdated. In this case, wait for the next PT change.
if np == len(db._readable_set):
while 1:
tid_list = []
loop = False
for offset in offset_list:
x = db.getReplicationTIDList(
self.min_tid, MAX_TID, chunk_size, offset)
tid_list += x
if len(x) == chunk_size:
loop = True
if tid_list:
tid_list.sort()
for tid in tid_list:
if self._stop.is_set():
return
yield TransactionRecord(db, tid)
self.min_tid = util.add64(tid, 1)
if loop:
continue
break
if not self._event.is_set():
self._idle.set()
self._event.wait()
self._idle.clear()
self._event.clear()
if self._stop.is_set():
break
class TransactionRecord(BaseStorage.TransactionRecord):
def __init__(self, db, tid):
self._oid_list, user, desc, ext, _, _ = db.getTransaction(tid)
super(TransactionRecord, self).__init__(tid, ' ', user, desc,
loads(ext) if ext else {})
self._db = db
def __iter__(self):
tid = self.tid
for oid in self._oid_list:
_, compression, _, data, data_tid = self._db.fetchObject(oid, tid)
if data is not None:
data = compress.decompress_list[compression](data)
yield BaseStorage.DataRecord(oid, tid, data, data_tid)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/database/manager.py 0000664 0000000 0000000 00000106370 13313233730 0030737 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os, errno, socket, struct, sys, threading
from collections import defaultdict
from contextlib import contextmanager
from copy import copy
from functools import wraps
from neo.lib import logging, util
from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID
from . import DatabaseFailure
READABLE = CellStates.UP_TO_DATE, CellStates.FEEDING
def lazymethod(func):
def getter(self):
cls = self.__class__
name = func.__name__
assert name not in cls.__dict__
setattr(cls, name, func(self))
return getattr(self, name)
return property(getter, doc=func.__doc__)
def fallback(func):
def warn(self):
logging.info("Fallback to generic/slow implementation of %s."
" It should be overridden by backend storage (%s).",
func.__name__, self.__class__.__name__)
return func
return lazymethod(wraps(func)(warn))
def splitOIDField(tid, oids):
if len(oids) % 8:
raise DatabaseFailure('invalid oids length for tid %s: %s'
% (tid, len(oids)))
return [oids[i:i+8] for i in xrange(0, len(oids), 8)]
class CreationUndone(Exception):
pass
class DatabaseManager(object):
"""This class only describes an interface for database managers."""
ENGINES = ()
UNSAFE = False
__lock = None
LOCK = "neostorage"
LOCKED = "error: database is locked"
_deferred = 0
_repairing = None
def __init__(self, database, engine=None, wait=None):
"""
Initialize the object.
"""
if engine:
if engine not in self.ENGINES:
raise ValueError("Unsupported engine: %r not in %r"
% (engine, self.ENGINES))
self._engine = engine
# XXX: Maybe the default should be to retry indefinitely.
# But for unit tests, we really want to never retry.
self._wait = wait or 0
self._parse(database)
self._init_attrs = tuple(self.__dict__)
self._connect()
def __getstate__(self):
state = {x: getattr(self, x) for x in self._init_attrs}
assert state # otherwise, __setstate__ is not called
return state
def __setstate__(self, state):
self.__dict__.update(state)
# For the moment, no need to duplicate secondary connections.
#self._init_attrs = tuple(self.__dict__)
# Secondary connections don't lock.
self.LOCK = None
self._connect()
@contextmanager
def _duplicate(self):
db = copy(self)
try:
yield db
finally:
db.close()
def __getattr__(self, attr):
if attr in ('_readable_set', '_getPartition', '_getReadablePartition'):
self._updateReadable()
return self.__getattribute__(attr)
def _partitionTableChanged(self):
try:
del (self._readable_set,
self._getPartition,
self._getReadablePartition)
except AttributeError:
pass
def __enter__(self):
assert not self.LOCK, "not a secondary connection"
# XXX: All config caching should be done in this class,
# rather than in backend classes.
self._config.clear()
self._partitionTableChanged()
def __exit__(self, t, v, tb):
if v is None:
# Deferring commits make no sense for secondary connections.
assert not self._deferred
self._commit()
@abstract
def _parse(self, database):
"""Called during instantiation, to process database parameter."""
@abstract
def _connect(self):
"""Connect to the database"""
def autoReconnect(self, f):
"""
Placeholder for backends that may lose connection to the underlying
database: although a primary connection is reestablished transparently
when possible, secondary connections use transactions and they must
restart from the beginning.
For other backends, there's no expected transient failure so the
default implementation is to execute the given task exactly once.
"""
f()
def lock(self, db_path):
if self.LOCK:
assert self.__lock is None, self.__lock
# For platforms that don't support anonymous sockets,
# we can either use zc.lockfile or an empty SQLite db
# (with BEGIN EXCLUSIVE).
try:
stat = os.stat(db_path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return # in-memory or temporary database
s = self.__lock = socket.socket(socket.AF_UNIX)
try:
s.bind('\0%s:%s:%s' % (self.LOCK, stat.st_dev, stat.st_ino))
except socket.error as e:
if e.errno != errno.EADDRINUSE:
raise
sys.exit(self.LOCKED)
def _getDevPath(self):
"""
"""
@requires(_getDevPath)
def getTopologyPath(self):
# On Windows, st_dev only exists since Python 3.4
return socket.gethostname(), str(os.stat(self._getDevPath()).st_dev)
@abstract
def erase(self):
""""""
def _setup(self, dedup=False):
"""To be overridden by the backend to set up a database
It must recover self._uncommitted_data from temporary object table.
_uncommitted_data is already instantiated and must be updated with
refcounts to data of write-locked objects, except in case of undo,
where the refcount is increased later, when the object is read-locked.
Keys are data ids and values are number of references.
"""
@requires(_setup)
def setup(self, reset=False, dedup=False):
"""Set up a database, discarding existing data first if reset is True
"""
if reset:
self.erase()
self._uncommitted_data = defaultdict(int)
self._setup(dedup)
@abstract
def nonempty(self, table):
"""Check whether table is empty or return None if it does not exist"""
def _checkNoUnfinishedTransactions(self, *hint):
if self.nonempty('ttrans') or self.nonempty('tobj'):
raise DatabaseFailure(
"The database can not be upgraded because you have unfinished"
" transactions. Use an older version of NEO to verify them.")
def migrate(self, *args, **kw):
version = int(self.getConfiguration("version") or 0)
if self.VERSION < version:
raise DatabaseFailure("The database can not be downgraded.")
while version < self.VERSION:
version += 1
getattr(self, '_migrate%s' % version)(*args, **kw)
self.setConfiguration("version", version)
def doOperation(self, app):
pass
def _close(self):
"""Backend-specific code to close the database"""
@requires(_close)
def close(self):
self._deferredCommit()
self._close()
if self.__lock:
self.__lock.close()
del self.__lock
def _commit(self):
"""Backend-specific code to commit the pending changes"""
@requires(_commit)
def commit(self):
logging.debug('committing...')
self._commit()
# Instead of cancelling a timeout that would be set to defer a commit,
# we simply use to a boolean so that _deferredCommit() does nothing.
# IOW, epoll may wait wake up for nothing but that should be rare,
# because most immediate commits are usually quickly followed by
# deferred commits.
self._deferred = 0
def deferCommit(self):
self._deferred = 1
return self._deferredCommit
def _deferredCommit(self):
if self._deferred:
self.commit()
@abstract
def getConfiguration(self, key):
"""
Return a configuration value, returns None if not found or not set
"""
def setConfiguration(self, key, value):
"""
Set a configuration value
"""
self._setConfiguration(key, value)
self.commit()
@abstract
def _setConfiguration(self, key, value):
""""""
def getUUID(self):
"""
Load a NID from a database.
"""
nid = self.getConfiguration('nid')
if nid is not None:
return int(nid)
def setUUID(self, nid):
"""
Store a NID into a database.
"""
self.setConfiguration('nid', str(nid))
def getNumPartitions(self):
"""
Load the number of partitions from a database.
"""
n = self.getConfiguration('partitions')
if n is not None:
return int(n)
def setNumPartitions(self, num_partitions):
"""
Store the number of partitions into a database.
"""
self.setConfiguration('partitions', num_partitions)
self._partitionTableChanged()
def getNumReplicas(self):
"""
Load the number of replicas from a database.
"""
n = self.getConfiguration('replicas')
if n is not None:
return int(n)
def setNumReplicas(self, num_replicas):
"""
Store the number of replicas into a database.
"""
self.setConfiguration('replicas', num_replicas)
def getName(self):
"""
Load a name from a database.
"""
return self.getConfiguration('name')
def setName(self, name):
"""
Store a name into a database.
"""
self.setConfiguration('name', name)
def getPTID(self):
"""
Load a Partition Table ID from a database.
"""
ptid = self.getConfiguration('ptid')
if ptid is not None:
return int(ptid)
def getBackupTID(self):
return util.bin(self.getConfiguration('backup_tid'))
def _setBackupTID(self, tid):
tid = util.dump(tid)
logging.debug('backup_tid = %s', tid)
return self._setConfiguration('backup_tid', tid)
def getTruncateTID(self):
return util.bin(self.getConfiguration('truncate_tid'))
def _setTruncateTID(self, tid):
tid = util.dump(tid)
logging.debug('truncate_tid = %s', tid)
return self._setConfiguration('truncate_tid', tid)
def _setPackTID(self, tid):
self._setConfiguration('_pack_tid', tid)
def _getPackTID(self):
try:
return int(self.getConfiguration('_pack_tid'))
except TypeError:
return -1
# XXX: Consider splitting getLastIDs/_getLastIDs because
# sometimes the last oid is not wanted.
def _getLastTID(self, partition, max_tid=None):
"""Return tid of last transaction <= 'max_tid' in given 'partition'
tids are in unpacked format.
"""
@requires(_getLastTID)
def getLastTID(self, max_tid=None):
"""Return tid of last transaction <= 'max_tid'
tids are in unpacked format.
"""
if self.getNumPartitions():
return max(map(self._getLastTID, self._readable_set))
def _getLastIDs(self, partition):
"""Return max(tid) & max(oid) for objects of given partition
Results are in unpacked format
"""
@requires(_getLastIDs)
def getLastIDs(self):
"""Return max(tid) & max(oid) for readable data
It is important to ignore unassigned partitions because there may
remain data from cells that have been discarded, either due to
--disable-drop-partitions option, or in the future when dropping
partitions is done in background (as it is an expensive operation).
"""
x = self._readable_set
if x:
tid, oid = zip(*map(self._getLastIDs, x))
tid = max(self.getLastTID(None), max(tid))
oid = max(oid)
return (None if tid is None else util.p64(tid),
None if oid is None else util.p64(oid))
return None, None
def _getUnfinishedTIDDict(self):
""""""
@requires(_getUnfinishedTIDDict)
def getUnfinishedTIDDict(self):
trans, obj = self._getUnfinishedTIDDict()
obj = dict.fromkeys(obj)
obj.update(trans)
p64 = util.p64
return {p64(ttid): None if tid is None else p64(tid)
for ttid, tid in obj.iteritems()}
@fallback
def getLastObjectTID(self, oid):
"""Return the latest tid of given oid or None if it does not exist"""
r = self.getObject(oid)
return r and r[0]
@abstract
def _getNextTID(self, partition, oid, tid):
"""
partition (int)
Must be the result of (oid % self.getPartition(oid))
oid (int)
Identifier of object to retrieve.
tid (int)
Exact serial to retrieve.
If tid is the last revision of oid, None is returned.
"""
def _getObject(self, oid, tid=None, before_tid=None):
"""
oid (int)
Identifier of object to retrieve.
tid (int, None)
Exact serial to retrieve.
before_tid (int, None)
Serial to retrieve is the highest existing one strictly below this
value.
Return value:
None: oid doesn't exist at requested tid/before_tid (getObject
takes care of checking if the oid exists at other serial)
6-tuple: Record content.
- record serial (int)
- serial or next record modifying object (int, None)
- compression (boolean-ish, None)
- checksum (binary string, None)
- data (binary string, None)
- data_serial (int, None)
"""
@requires(_getObject)
def getObject(self, oid, tid=None, before_tid=None):
"""
oid (packed)
Identifier of object to retrieve.
tid (packed, None)
Exact serial to retrieve.
before_tid (packed, None)
Serial to retrieve is the highest existing one strictly below this
value.
Return value:
None: Given oid doesn't exist in database.
False: No record found, but another one exists for given oid.
6-tuple: Record content.
- record serial (packed)
- serial or next record modifying object (packed, None)
- compression (boolean-ish, None)
- checksum (binary string, None)
- data (binary string, None)
- data_serial (packed, None)
"""
u64 = util.u64
r = self._getObject(u64(oid), tid and u64(tid),
before_tid and u64(before_tid))
try:
serial, next_serial, compression, checksum, data, data_serial = r
except TypeError:
# See if object exists at all
return (tid or before_tid) and self.getLastObjectTID(oid) and False
return (util.p64(serial),
None if next_serial is None else util.p64(next_serial),
compression, checksum, data,
None if data_serial is None else util.p64(data_serial))
@fallback
def _fetchObject(self, oid, tid):
"""Specialized version of _getObject, for replication"""
r = self._getObject(oid, tid)
if r:
return r[:1] + r[2:] # remove next_serial
def fetchObject(self, oid, tid):
"""
Specialized version of getObject, for replication:
- the oid can only be at an exact serial (parameter 'tid')
- next_serial is not part of the result
- if there's no result for the requested serial,
no need check if oid exists at other serial
"""
u64 = util.u64
r = self._fetchObject(u64(oid), u64(tid))
if r:
serial, compression, checksum, data, data_serial = r
return (util.p64(serial), compression, checksum, data,
None if data_serial is None else util.p64(data_serial))
def _getPartitionTable(self):
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
@requires(_getPartitionTable)
def _iterAssignedCells(self):
my_nid = self.getUUID()
return ((offset, tid) for offset, nid, tid in self._getPartitionTable()
if my_nid == nid)
@requires(_getPartitionTable)
def getPartitionTable(self):
return [(offset, nid, max(0, -state))
for offset, nid, state in self._getPartitionTable()]
@contextmanager
def replicated(self, offset):
readable_set = self._readable_set
assert offset not in readable_set
readable_set.add(offset)
try:
yield
finally:
readable_set.remove(offset)
def _changePartitionTable(self, cell_list, reset=False):
"""Change a part of a partition table. The list of cells is
a tuple of tuples, each of which consists of an offset (row ID),
the NID of a storage node, and a cell state. If reset is True,
existing data is first thrown away.
"""
def _getDataLastId(self, partition):
"""
"""
@requires(_getDataLastId)
def _updateReadable(self):
try:
readable_set = self.__dict__['_readable_set']
except KeyError:
readable_set = self._readable_set = set()
np = self.getNumPartitions()
def _getPartition(x, np=np):
return x % np
def _getReadablePartition(x, np=np, r=readable_set):
x %= np
if x in r:
return x
raise NonReadableCell
self._getPartition = _getPartition
self._getReadablePartition = _getReadablePartition
d = self._data_last_ids = []
for p in xrange(np):
i = self._getDataLastId(p)
d.append(p << 48 if i is None else i + 1)
else:
readable_set.clear()
readable_set.update(x[0] for x in self._iterAssignedCells()
if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, cell_list, reset=False):
my_nid = self.getUUID()
pt = dict(self._iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
# incomplete.
backup_tid = self.getBackupTID()
if backup_tid:
backup_tid = util.u64(backup_tid)
def outofdate_tid(offset):
tid = pt.get(offset, 0)
if tid >= 0:
return tid
return -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cell_list = [(offset, nid, (
None if state == CellStates.DISCARDED else
-state if nid != my_nid or state != CellStates.OUT_OF_DATE else
outofdate_tid(offset)))
for offset, nid, state in cell_list]
self._changePartitionTable(cell_list, reset)
self._updateReadable()
assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
t, = (t for p, t in self._iterAssignedCells() if p == partition)
if t < 0:
return
tid = util.u64(tid)
# Replicator doesn't optimize when there's no new data
# since the node went down.
if t == tid:
return
# In a backup cluster, when a storage node gets down just after
# being the first to replicate fully new transactions from upstream,
# we may end up in a special situation where an OUT_OF_DATE cell
# is actually more up-to-date than an UP_TO_DATE one.
assert t < tid or self.getBackupTID()
self._changePartitionTable([(partition, self.getUUID(), tid)])
def iterCellNextTIDs(self):
p64 = util.p64
backup_tid = self.getBackupTID()
if backup_tid:
next_tid = util.u64(backup_tid)
if next_tid:
next_tid += 1
for offset, tid in self._iterAssignedCells():
if tid >= 0: # OUT_OF_DATE
yield offset, p64(tid and tid + 1)
elif -tid in READABLE:
if backup_tid:
# An UP_TO_DATE cell does not have holes so it's fine to
# resume from the last found records.
tid = self._getLastTID(offset)
yield offset, (
# For trans, a transaction can't be partially
# replicated, so replication can resume from the next
# possible tid.
p64(max(next_tid, tid + 1) if tid else next_tid),
# For obj, the last transaction may be partially
# replicated so it must be checked again (i.e. no +1).
p64(max(next_tid, self._getLastIDs(offset)[0])))
else:
yield offset, None
@abstract
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
def _getUnfinishedDataIdList(self):
"""Drop any unfinished data from a database."""
@requires(_getUnfinishedDataIdList)
def dropUnfinishedData(self):
"""Drop any unfinished data from a database."""
data_id_list = self._getUnfinishedDataIdList()
self.dropPartitionsTemporary()
self.releaseData(data_id_list, True)
self.commit()
@abstract
def dropPartitionsTemporary(self, offset_list=None):
"""Drop partitions from temporary tables"""
@abstract
def storeTransaction(self, tid, object_list, transaction, temporary = True):
"""Write transaction metadata
The list of objects contains tuples, each of which consists of
an object ID, a data_id and object serial.
The transaction is either None or a tuple of the list of OIDs,
user information, a description, extension information and transaction
pack state (True for packed).
If 'temporary', the transaction is stored into ttrans/tobj tables,
(instead of trans/obj). The caller is in charge of committing, which
is always the case at tpc_vote.
"""
@abstract
def getOrphanList(self):
"""Return the list of data id that is not referenced by the obj table
This is a repair method, and it's usually expensive.
There was a bug that did not free data of transactions that were
aborted before vote. This method is used to reclaim the wasted space.
"""
@abstract
def _pruneData(self, data_id_list):
"""To be overridden by the backend to delete any unreferenced data
'unreferenced' means:
- not in self._uncommitted_data
- and not referenced by a fully-committed object (storage should have
an index or a refcount of all data ids of all objects)
The returned value is the number of deleted rows from the data table.
"""
@abstract
def storeData(self, checksum, oid, data, compression):
"""To be overridden by the backend to store object raw data
If same data was already stored, the storage only has to check there's
no hash collision.
"""
@abstract
def loadData(self, data_id):
"""Inverse of storeData
"""
def holdData(self, checksum_or_id, *args):
"""Store raw data of temporary object
If 'checksum_or_id' is a checksum, it must be the result of
makeChecksum(data) and extra parameters must be (data, compression)
where 'compression' indicates if 'data' is compressed.
A volatile reference is set to this data until 'releaseData' is called
with this checksum.
If called with only an id, it only increment the volatile
reference to the data matching the id.
"""
if args:
checksum_or_id = self.storeData(checksum_or_id, *args)
self._uncommitted_data[checksum_or_id] += 1
return checksum_or_id
def releaseData(self, data_id_list, prune=False):
"""Release 1 volatile reference to given list of data ids
If 'prune' is true, any data that is not referenced anymore (either by
a volatile reference or by a fully-committed object) is deleted.
"""
refcount = self._uncommitted_data
for data_id in data_id_list:
count = refcount[data_id] - 1
if count:
refcount[data_id] = count
else:
del refcount[data_id]
if prune:
return self._pruneData(data_id_list)
@fallback
def _getDataTID(self, oid, tid=None, before_tid=None):
"""
Return a 2-tuple:
tid (int)
tid corresponding to received parameters
serial
data tid of the found record
(None, None) is returned if requested object and transaction
could not be found.
This method only exists for performance reasons, by not returning data:
_getObject already returns these values but it is slower.
"""
r = self._getObject(oid, tid, before_tid)
return (r[0], r[-1]) if r else (None, None)
def findUndoTID(self, oid, tid, ltid, undone_tid, transaction_object):
"""
oid
Object OID
tid
Transation doing the undo
ltid
Upper (excluded) bound of transactions visible to transaction doing
the undo.
undone_tid
Transaction to undo
transaction_object
Object data from memory, if it was modified by running
transaction.
None if is was not modified by running transaction.
Returns a 3-tuple:
current_tid (p64)
TID of most recent version of the object client's transaction can
see. This is used later to detect current conflicts (eg, another
client modifying the same object in parallel)
data_tid (int)
TID containing (without indirection) the data prior to undone
transaction.
None if object doesn't exist prior to transaction being undone
(its creation is being undone).
is_current (bool)
False if object was modified by later transaction (ie, data_tid is
not current), True otherwise.
"""
u64 = util.u64
p64 = util.p64
oid = u64(oid)
tid = u64(tid)
if ltid:
ltid = u64(ltid)
undone_tid = u64(undone_tid)
def getDataTID(tid=None, before_tid=None):
tid, data_tid = self._getDataTID(oid, tid, before_tid)
current_tid = tid
while data_tid:
if data_tid < tid:
tid, data_tid = self._getDataTID(oid, data_tid)
if tid is not None:
continue
logging.error("Incorrect data serial for oid %s at tid %s",
oid, current_tid)
return current_tid, current_tid
return current_tid, tid
if transaction_object:
try:
current_tid = current_data_tid = u64(transaction_object[2])
except struct.error:
current_tid = current_data_tid = tid
else:
current_tid, current_data_tid = getDataTID(before_tid=ltid)
if current_tid is None:
return None, None, False
found_undone_tid, undone_data_tid = getDataTID(tid=undone_tid)
assert found_undone_tid is not None, (oid, undone_tid)
is_current = undone_data_tid in (current_data_tid, tid)
# Load object data as it was before given transaction.
# It can be None, in which case it means we are undoing object
# creation.
_, data_tid = getDataTID(before_tid=undone_tid)
if data_tid is not None:
data_tid = p64(data_tid)
return p64(current_tid), data_tid, is_current
@abstract
def lockTransaction(self, tid, ttid):
"""Mark voted transaction 'ttid' as committed with given 'tid'
All pending changes are committed just before returning to the caller.
"""
@abstract
def unlockTransaction(self, tid, ttid, trans, obj):
"""Finalize a transaction by moving data to a finished area."""
@abstract
def abortTransaction(self, ttid):
""""""
@abstract
def deleteTransaction(self, tid):
""""""
@abstract
def deleteObject(self, oid, serial=None):
"""Delete given object. If serial is given, only delete that serial for
given oid."""
@abstract
def _deleteRange(self, partition, min_tid=None, max_tid=None):
"""Delete all objects and transactions between given min_tid (excluded)
and max_tid (included)"""
def truncate(self):
tid = self.getTruncateTID()
if tid:
tid = util.u64(tid)
assert tid, tid
cell_list = []
my_nid = self.getUUID()
for partition, state in self._iterAssignedCells():
if state > tid:
cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid)
if cell_list:
self._changePartitionTable(cell_list)
self._setTruncateTID(None)
self.commit()
def repair(self, weak_app, dry_run):
t = self._repairing
if t and t.is_alive():
logging.error('already repairing')
return
def repair():
l = threading.Lock()
l.acquire()
def finalize():
try:
if data_id_list and not dry_run:
self.commit()
logging.info("repair: deleted %s orphan records",
self._pruneData(data_id_list))
self.commit()
finally:
l.release()
try:
with self._duplicate() as db:
data_id_list = db.getOrphanList()
logging.info("repair: found %s records that may be orphan",
len(data_id_list))
weak_app().em.wakeup(finalize)
l.acquire()
finally:
del self._repairing
logging.info("repair: done")
t = self._repairing = threading.Thread(target=repair)
t.daemon = 1
t.start()
@abstract
def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information,
a description, and extension information, for a given transaction
ID. If there is no such transaction ID in a database, return None.
If all is true, the transaction must be searched from a temporary
area as well."""
@abstract
def getObjectHistory(self, oid, offset, length):
"""Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. Result starts
with latest serial, and the list must be sorted in descending order.
If there is no such object ID in a database, return None."""
@abstract
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
"""Return a dict of length oids grouped by serial at (or above)
min_tid and min_oid and below max_tid, for given partition,
sorted in ascending order."""
def _getTIDList(self, offset, length, partition_list):
"""Return a list of TIDs in ascending order from an offset,
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
@requires(_getTIDList)
def getTIDList(self, offset, length, partition_list):
if partition_list:
if self._readable_set.issuperset(partition_list):
return map(util.p64, self._getTIDList(
offset, length, partition_list))
raise NonReadableCell
return ()
@abstract
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
"""Return a list of TIDs in ascending order from an initial tid value,
at most the specified length up to max_tid. The partition number is
passed to filter out non-applicable TIDs."""
@abstract
def pack(self, tid, updateObjectDataForPack):
"""Prune all non-current object revisions at given tid.
updateObjectDataForPack is a function called for each deleted object
and revision with:
- OID
- packed TID
- new value_serial
If object data was moved to an after-pack-tid revision, this
parameter contains the TID of that revision, allowing to backlink
to it.
- getObjectData function
To call if value_serial is None and an object needs to be updated.
Takes no parameter, returns a 3-tuple: compression, data_id,
value
"""
@abstract
def checkTIDRange(self, partition, length, min_tid, max_tid):
"""
Generate a digest from transaction list.
min_tid (packed)
TID at which verification starts.
length (int)
Maximum number of records to include in result.
Returns a 3-tuple:
- number of records actually found
- a SHA1 computed from record's TID
ZERO_HASH if no record found
- biggest TID found (ie, TID of last record read)
ZERO_TID if not record found
"""
@abstract
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
"""
Generate a digest from object list.
min_oid (packed)
OID at which verification starts.
min_tid (packed)
Serial of min_oid object at which search should start.
length
Maximum number of records to include in result.
Returns a 5-tuple:
- number of records actually found
- a SHA1 computed from record's OID
ZERO_HASH if no record found
- biggest OID found (ie, OID of last record read)
ZERO_OID if no record found
- a SHA1 computed from record's serial
ZERO_HASH if no record found
- biggest serial found for biggest OID found (ie, serial of last
record read)
ZERO_TID if no record found
"""
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/database/mysqldb.py 0000664 0000000 0000000 00000113427 13313233730 0031001 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from binascii import a2b_hex
from collections import OrderedDict
from functools import wraps
import MySQLdb
from MySQLdb import DataError, IntegrityError, \
OperationalError, ProgrammingError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
from MySQLdb.constants.ER import DATA_TOO_LONG, DUP_ENTRY, NO_SUCH_TABLE
# BBB: the following 2 constants were added to mysqlclient 1.3.8
DROP_LAST_PARTITION = 1508
SAME_NAME_PARTITION = 1517
from array import array
from hashlib import sha1
import os
import re
import string
import struct
import sys
import time
from . import LOG_QUERIES, DatabaseFailure
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.interfaces import implements
from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
class MysqlError(DatabaseFailure):
def __init__(self, exc, query=None):
self.exc = exc
self.query = query
code = property(lambda self: self.exc.args[0])
def __str__(self):
msg = 'MySQL error %s: %s' % self.exc.args
return msg if self.query is None else '%s\nQuery: %s' % (
msg, getPrintableQuery(self.query[:1000]))
def getPrintableQuery(query, max=70):
return ''.join(c if c in string.printable and c not in '\t\x0b\x0c\r'
else '\\x%02x' % ord(c) for c in query)
def auto_reconnect(wrapped):
def wrapper(self, *args):
# Try 3 times at most. When it fails too often for the same
# query then the disconnection is likely caused by this query.
# We don't want to enter into an infinite loop.
retry = 2
while 1:
try:
return wrapped(self, *args)
except OperationalError as m:
# IDEA: Is it safe to retry in case of DISK_FULL ?
# XXX: However, this would another case of failure that would
# be unnoticed by other nodes (ADMIN & MASTER). When
# there are replicas, it may be preferred to not retry.
if (self._active
or SERVER_GONE_ERROR != m.args[0] != SERVER_LOST
or not retry):
if self.LOCK:
raise MysqlError(m, *args)
raise # caught upper for secondary connections
logging.info('the MySQL server is gone; reconnecting')
assert not self._deferred
self.close()
retry -= 1
return wraps(wrapped)(wrapper)
@implements
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
VERSION = 3
ENGINES = "InnoDB", "RocksDB", "TokuDB"
_engine = ENGINES[0] # default engine
_use_partition = False
_max_allowed_packet = 32769 * 1024
def _parse(self, database):
""" Get the database credentials (username, password, database) """
# expected pattern : [user[:password]@]database[(~|.|/)unix_socket]
self.user, self.passwd, self.db, self.socket = re.match(
'(?:([^:]+)(?::(.*))?@)?([^~./]+)(.+)?$', database).groups()
def _close(self):
try:
conn = self.__dict__.pop('conn')
except KeyError:
return
conn.close()
def __getattr__(self, attr):
if attr == 'conn':
self._tryConnect()
return super(MySQLDatabaseManager, self).__getattr__(attr)
def _tryConnect(self):
kwd = {'db' : self.db, 'user' : self.user}
if self.passwd is not None:
kwd['passwd'] = self.passwd
if self.socket:
kwd['unix_socket'] = os.path.expanduser(self.socket)
logging.info('connecting to MySQL on the database %s with user %s',
self.db, self.user)
self._active = 0
if self._wait < 0:
timeout_at = None
else:
timeout_at = time.time() + self._wait
last = None
while True:
try:
self.conn = MySQLdb.connect(**kwd)
break
except Exception as e:
if None is not timeout_at <= time.time():
raise
e = str(e)
if last == e:
log = logging.debug
else:
last = e
log = logging.exception
log('Connection to MySQL failed, retrying.')
time.sleep(1)
self._config = {}
conn = self.conn
conn.autocommit(False)
conn.query("SET SESSION group_concat_max_len = %u" % (2**32-1))
conn.set_sql_mode("TRADITIONAL,NO_ENGINE_SUBSTITUTION")
def query(sql):
conn.query(sql)
r = conn.store_result()
return r.fetch_row(r.num_rows())
if self.LOCK:
(locked,), = query("SELECT GET_LOCK('%s.%s', 0)"
% (self.db, self.LOCK))
if not locked:
sys.exit(self.LOCKED)
(name, value), = query(
"SHOW VARIABLES WHERE variable_name='max_allowed_packet'")
if int(value) < self._max_allowed_packet:
raise DatabaseFailure("Global variable %r is too small."
" Minimal value must be %uk."
% (name, self._max_allowed_packet // 1024))
self._max_allowed_packet = int(value)
try:
self._dedup = bool(query(
"SHOW INDEX FROM data WHERE key_name='hash'"))
except ProgrammingError as e:
if e.args[0] != NO_SUCH_TABLE:
raise
self._dedup = None
if not self.LOCK:
# Prevent automatic reconnection for secondary connections.
self._active = 1
self._commit = self.conn.commit
_connect = auto_reconnect(_tryConnect)
def autoReconnect(self, f):
assert self._active and not self.LOCK
@auto_reconnect
def try_once(self):
if self._active:
try:
f()
finally:
self._active = 0
return True
while not try_once(self):
# Avoid reconnecting too often.
# Since this is used to wrap an arbitrary long process and
# not just a single query, we can't limit the number of retries.
time.sleep(5)
self._connect()
def _commit(self):
self.conn.commit()
self._active = 0
@auto_reconnect
def query(self, query):
"""Query data from a database."""
if LOG_QUERIES:
logging.debug('querying %s...',
getPrintableQuery(query.split('\n', 1)[0][:70]))
conn = self.conn
conn.query(query)
if query.startswith("SELECT "):
r = conn.store_result()
return tuple([
tuple([d.tostring() if isinstance(d, array) else d
for d in row])
for row in r.fetch_row(r.num_rows())])
r = query.split(None, 1)[0]
if r in ("INSERT", "REPLACE", "DELETE", "UPDATE"):
self._active = 1
else:
assert r in ("ALTER", "CREATE", "DROP"), query
@property
def escape(self):
"""Escape special characters in a string."""
return self.conn.escape_string
def _getDevPath(self):
# BBB: MySQL is moving to Performance Schema.
return self.query("SELECT * FROM information_schema.global_variables"
" WHERE variable_name='datadir'")[0][1]
def erase(self):
self.query("DROP TABLE IF EXISTS"
" config, pt, trans, obj, data, bigdata, ttrans, tobj")
def nonempty(self, table):
try:
return bool(self.query("SELECT 1 FROM %s LIMIT 1" % table))
except ProgrammingError as e:
if e.args[0] != NO_SUCH_TABLE:
raise
def _alterTable(self, schema_dict, table, select="*"):
q = self.query
new = 'new_' + table
if self.nonempty(table) is None:
if self.nonempty(new) is None:
return
else:
q("DROP TABLE IF EXISTS " + new)
q(schema_dict.pop(table) % new
+ " SELECT %s FROM %s" % (select, table))
q("DROP TABLE " + table)
q("ALTER TABLE %s RENAME TO %s" % (new, table))
def _migrate1(self, _):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict):
self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict):
self._alterTable(schema_dict, 'pt', "rid as `partition`, nid,"
" CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state"
" END as tid")
def _setup(self, dedup=False):
self._config.clear()
q = self.query
p = engine = self._engine
schema_dict = OrderedDict()
# The table "config" stores configuration
# parameters which affect the persistent data.
schema_dict['config'] = """CREATE TABLE %s (
name VARBINARY(255) NOT NULL PRIMARY KEY,
value VARBINARY(255) NULL
) ENGINE=""" + engine
# The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
nid INT NOT NULL,
tid BIGINT NOT NULL,
PRIMARY KEY (`partition`, nid)
) ENGINE=""" + engine
if self._use_partition:
p += """ PARTITION BY LIST (`partition`) (
PARTITION dummy VALUES IN (NULL))"""
# The table "trans" stores information on committed transactions.
schema_dict['trans'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`partition`, tid)
) ENGINE=""" + p
# The table "obj" stores committed object metadata.
schema_dict['obj'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (`partition`, oid, tid),
KEY tid (`partition`, tid, oid),
KEY (data_id)
) ENGINE=""" + p
if engine == "TokuDB":
engine += " compression='tokudb_uncompressed'"
# The table "data" stores object data.
# We'd like to have partial index on 'hash' column (e.g. hash(4))
# but 'UNIQUE' constraint would not work as expected.
schema_dict['data'] = """CREATE TABLE %%s (
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL,
value MEDIUMBLOB NOT NULL%s
) ENGINE=%s""" % (""",
UNIQUE (hash, compression)""" if dedup else "", engine)
schema_dict['bigdata'] = """CREATE TABLE %s (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
value MEDIUMBLOB NOT NULL
) ENGINE=""" + engine
# The table "ttrans" stores information on uncommitted transactions.
schema_dict['ttrans'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED,
packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL
) ENGINE=""" + engine
# The table "tobj" stores uncommitted object metadata.
schema_dict['tobj'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (tid, oid)
) ENGINE=""" + engine
if self.nonempty('config') is None:
q(schema_dict.pop('config') % 'config')
self._setConfiguration('version', self.VERSION)
else:
self.migrate(schema_dict)
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
if self._dedup is None:
self._dedup = dedup
self._uncommitted_data.update(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
def getConfiguration(self, key):
try:
return self._config[key]
except KeyError:
sql_key = self.escape(str(key))
try:
r = self.query("SELECT value FROM config WHERE name = '%s'"
% sql_key)[0][0]
except IndexError:
r = None
self._config[key] = r
return r
def _setConfiguration(self, key, value):
q = self.query
e = self.escape
self._config[key] = value
k = e(str(key))
if value is None:
q("DELETE FROM config WHERE name = '%s'" % k)
return
value = str(value)
sql = "REPLACE INTO config VALUES ('%s', '%s')" % (k, e(value))
try:
q(sql)
except DataError as e:
if e.args[0] != DATA_TOO_LONG or len(value) < 256 or key != "zodb":
raise
q("ALTER TABLE config MODIFY value VARBINARY(%s) NULL" % len(value))
q(sql)
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
def _getLastTID(self, partition, max_tid=None):
x = "WHERE `partition`=%s" % partition
if max_tid:
x += " AND tid<=%s" % max_tid
(tid,), = self.query(
"SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)" + x)
return tid
def _getLastIDs(self, partition):
q = self.query
x = "WHERE `partition`=%s" % partition
(oid,), = q("SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)" + x)
(tid,), = q("SELECT MAX(tid) FROM obj FORCE INDEX (tid)" + x)
return tid, oid
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48))[0][0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
for ttid, in q("SELECT DISTINCT tid FROM tobj"))
def getFinalTID(self, ttid):
ttid = util.u64(ttid)
# MariaDB is smart enough to realize that 'ttid' is constant.
r = self.query("SELECT tid FROM trans"
" WHERE `partition`=%s AND tid>=ttid AND ttid=%s LIMIT 1"
% (self._getReadablePartition(ttid), ttid))
if r:
return util.p64(r[0][0])
def getLastObjectTID(self, oid):
oid = util.u64(oid)
r = self.query("SELECT tid FROM obj FORCE INDEX(PRIMARY)"
" WHERE `partition`=%d AND oid=%d"
" ORDER BY tid DESC LIMIT 1"
% (self._getReadablePartition(oid), oid))
return util.p64(r[0][0]) if r else None
def _getNextTID(self, *args): # partition, oid, tid
r = self.query("SELECT tid FROM obj"
" FORCE INDEX(PRIMARY)"
" WHERE `partition`=%d AND oid=%d AND tid>%d"
" ORDER BY tid LIMIT 1" % args)
return r[0][0] if r else None
def _getObject(self, oid, tid=None, before_tid=None):
q = self.query
partition = self._getReadablePartition(oid)
sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(PRIMARY)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d') % (partition, oid)
if before_tid is not None:
sql += ' AND tid < %d ORDER BY tid DESC LIMIT 1' % before_tid
elif tid is not None:
sql += ' AND tid = %d' % tid
else:
# XXX I want to express "HAVING tid = MAX(tid)", but
# MySQL does not use an index for a HAVING clause!
sql += ' ORDER BY tid DESC LIMIT 1'
r = q(sql)
try:
serial, compression, checksum, data, value_serial = r[0]
except IndexError:
return None
if compression and compression & 0x80:
compression &= 0x7f
data = ''.join(self._bigData(data))
return (serial, self._getNextTID(partition, oid, serial),
compression, checksum, data, value_serial)
def _changePartitionTable(self, cell_list, reset=False):
offset_list = []
q = self.query
if reset:
q("DELETE FROM pt")
for offset, nid, tid in cell_list:
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
if tid is None:
q("DELETE FROM pt WHERE `partition` = %d AND nid = %d"
% (offset, nid))
else:
offset_list.append(offset)
q("INSERT INTO pt VALUES (%d, %d, %d)"
" ON DUPLICATE KEY UPDATE tid = %d"
% (offset, nid, tid, tid))
if self._use_partition:
for offset in offset_list:
add = """ALTER TABLE %%s ADD PARTITION (
PARTITION p%u VALUES IN (%u))""" % (offset, offset)
for table in 'trans', 'obj':
try:
self.query(add % table)
except MysqlError as e:
if e.code != SAME_NAME_PARTITION:
raise
def dropPartitions(self, offset_list):
q = self.query
# XXX: these queries are inefficient (execution time increase with
# row count, although we use indexes) when there are rows to
# delete. It should be done as an idle task, by chunks.
for partition in offset_list:
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL" % where)]
if not self._use_partition:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
self._pruneData(data_id_list)
if self._use_partition:
drop = "ALTER TABLE %s DROP PARTITION" + \
','.join(' p%u' % i for i in offset_list)
for table in 'trans', 'obj':
try:
self.query(drop % table)
except MysqlError as e:
if e.code != DROP_LAST_PARTITION:
raise
def _getUnfinishedDataIdList(self):
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
" WHERE `partition` IN (%s)" % ','.join(map(str, offset_list))
q = self.query
q("DELETE FROM tobj" + where)
q("DELETE FROM ttrans" + where)
def storeTransaction(self, tid, object_list, transaction, temporary = True):
e = self.escape
u64 = util.u64
tid = u64(tid)
if temporary:
obj_table = 'tobj'
trans_table = 'ttrans'
else:
obj_table = 'obj'
trans_table = 'trans'
q = self.query
sql = ["REPLACE INTO %s VALUES " % obj_table]
values_max = self._max_allowed_packet - len(sql[0])
values_size = 0
for oid, data_id, value_serial in object_list:
oid = u64(oid)
partition = self._getPartition(oid)
if value_serial:
value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj"
" WHERE `partition`=%d AND oid=%d AND tid=%d"
% (partition, oid, value_serial))
if temporary:
self.holdData(data_id)
else:
value_serial = 'NULL'
value = "(%s,%s,%s,%s,%s)," % (
partition, oid, tid,
'NULL' if data_id is None else data_id,
value_serial)
values_size += len(value)
# actually: max_values < values_size + EXTRA - len(final comma)
# (test_max_allowed_packet checks that EXTRA == 2)
if values_max <= values_size:
sql[-1] = sql[-1][:-1] # remove final comma
q(''.join(sql))
del sql[1:]
values_size = len(value)
sql.append(value)
if values_size:
sql[-1] = value[:-1] # remove final comma
q(''.join(sql))
if transaction:
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
assert packed in (0, 1)
q("REPLACE INTO %s VALUES (%s,%s,%s,'%s','%s','%s','%s',%s)" % (
trans_table, partition, 'NULL' if temporary else tid, packed,
e(''.join(oid_list)), e(user), e(desc), e(ext), u64(ttid)))
_structLL = struct.Struct(">LL")
_unpackLL = _structLL.unpack
def getOrphanList(self):
return [x for x, in self.query(
"SELECT id FROM data LEFT JOIN obj ON (id=data_id)"
" WHERE data_id IS NULL")]
def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list:
q = self.query
id_list = []
bigid_list = []
for id, value in q("SELECT id, IF(compression < 128, NULL, value)"
" FROM data LEFT JOIN obj ON (id = data_id)"
" WHERE id IN (%s) AND data_id IS NULL"
% ",".join(map(str, data_id_list))):
id_list.append(str(id))
if value:
bigdata_id, length = self._unpackLL(value)
bigid_list += xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23))
if id_list:
q("DELETE FROM data WHERE id IN (%s)" % ",".join(id_list))
if bigid_list:
q("DELETE FROM bigdata WHERE id IN (%s)"
% ",".join(map(str, bigid_list)))
return len(id_list)
return 0
def _bigData(self, value):
bigdata_id, length = self._unpackLL(value)
q = self.query
return (q("SELECT value FROM bigdata WHERE id=%s" % i)[0][0]
for i in xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23)))
def storeData(self, checksum, oid, data, compression, _pack=_structLL.pack):
e = self.escape
checksum = e(checksum)
if 0x1000000 <= len(data): # 16M (MEDIUMBLOB limit)
compression |= 0x80
q = self.query
if self._dedup:
for r, d in q("SELECT id, value FROM data"
" WHERE hash='%s' AND compression=%s"
% (checksum, compression)):
i = 0
for d in self._bigData(d):
j = i + len(d)
if data[i:j] != d:
raise IntegrityError(DUP_ENTRY)
i = j
if j != len(data):
raise IntegrityError(DUP_ENTRY)
return r
i = 'NULL'
length = len(data)
for j in xrange(0, length, 0x800000): # 8M
q("INSERT INTO bigdata VALUES (%s, '%s')"
% (i, e(data[j:j+0x800000])))
if not j:
i = bigdata_id = self.conn.insert_id()
i += 1
data = _pack(bigdata_id, length)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (%s, '%s', %d, '%s')" %
(r, checksum, compression, e(data)))
except IntegrityError as e:
if e.args[0] == DUP_ENTRY:
(r, d), = self.query("SELECT id, value FROM data"
" WHERE hash='%s' AND compression=%s"
% (checksum, compression))
if d == data:
return r
raise
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
compression, hash, value = self.query(
"SELECT compression, hash, value FROM data where id=%s"
% data_id)[0]
if compression and compression & 0x80:
compression &= 0x7f
data = ''.join(self._bigData(data))
return compression, hash, value
del _structLL
def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT tid, value_tid FROM obj FORCE INDEX(PRIMARY)'
' WHERE `partition` = %d AND oid = %d'
) % (self._getReadablePartition(oid), oid)
if tid is not None:
sql += ' AND tid = %d' % tid
elif before_tid is not None:
sql += ' AND tid < %d ORDER BY tid DESC LIMIT 1' % before_tid
else:
# XXX I want to express "HAVING tid = MAX(tid)", but
# MySQL does not use an index for a HAVING clause!
sql += ' ORDER BY tid DESC LIMIT 1'
r = self.query(sql)
return r[0] if r else (None, None)
def lockTransaction(self, tid, ttid):
u64 = util.u64
self.query("UPDATE ttrans SET tid=%d WHERE ttid=%d LIMIT 1"
% (u64(tid), u64(ttid)))
self.commit()
def unlockTransaction(self, tid, ttid, trans, obj):
q = self.query
u64 = util.u64
tid = u64(tid)
if trans:
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
q("DELETE FROM ttrans WHERE tid=%d" % tid)
if not obj:
return
sql = " FROM tobj WHERE tid=%d" % u64(ttid)
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql)]
q("INSERT INTO obj SELECT `partition`, oid, %d, data_id, value_tid %s"
% (tid, sql))
q("DELETE" + sql)
self.releaseData(data_id_list)
def abortTransaction(self, ttid):
ttid = util.u64(ttid)
q = self.query
q("DELETE FROM tobj WHERE tid=%s" % ttid)
q("DELETE FROM ttrans WHERE ttid=%s" % ttid)
def deleteTransaction(self, tid):
tid = util.u64(tid)
self.query("DELETE FROM trans WHERE `partition`=%s AND tid=%s" %
(self._getPartition(tid), tid))
def deleteObject(self, oid, serial=None):
u64 = util.u64
oid = u64(oid)
sql = " FROM obj WHERE `partition`=%d AND oid=%d" \
% (self._getPartition(oid), oid)
if serial:
sql += ' AND tid=%d' % u64(serial)
q = self.query
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE `partition`=%d" % partition
if min_tid is not None:
sql += " AND %d < tid" % min_tid
if max_tid is not None:
sql += " AND tid <= %d" % max_tid
q = self.query
q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
def getTransaction(self, tid, all = False):
tid = util.u64(tid)
q = self.query
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM trans WHERE `partition` = %d AND tid = %d"
% (self._getReadablePartition(tid), tid))
if not r and all:
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM ttrans WHERE tid = %d" % tid)
if r:
oids, user, desc, ext, packed, ttid = r[0]
oid_list = splitOIDField(tid, oids)
return oid_list, user, desc, ext, bool(packed), util.p64(ttid)
def getObjectHistory(self, oid, offset, length):
# FIXME: This method doesn't take client's current transaction id as
# parameter, which means it can return transactions in the future of
# client's transaction.
oid = util.u64(oid)
p64 = util.p64
r = self.query("SELECT tid, IF(compression < 128, LENGTH(value),"
" CAST(CONV(HEX(SUBSTR(value, 5, 4)), 16, 10) AS INT))"
" FROM obj FORCE INDEX(PRIMARY)"
" LEFT JOIN data ON (obj.data_id = data.id)"
" WHERE `partition` = %d AND oid = %d AND tid >= %d"
" ORDER BY tid DESC LIMIT %d, %d" %
(self._getReadablePartition(oid), oid,
self._getPackTID(), offset, length))
if r:
return [(p64(tid), length or 0) for tid, length in r]
def _fetchObject(self, oid, tid):
r = self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(PRIMARY)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d AND tid = %d'
% (self._getReadablePartition(oid), oid, tid))
if r:
r = r[0]
compression = r[1]
if compression and compression & 0x80:
return (r[0], compression & 0x7f, r[2],
''.join(self._bigData(r[3])), r[4])
return r
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
r = self.query('SELECT tid, oid FROM obj FORCE INDEX(tid)'
' WHERE `partition` = %d AND tid <= %d'
' AND (tid = %d AND %d <= oid OR %d < tid)'
' ORDER BY tid ASC, oid ASC LIMIT %d' % (
partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length))
return [(p64(serial), p64(oid)) for serial, oid in r]
def _getTIDList(self, offset, length, partition_list):
return (t[0] for t in self.query(
"SELECT tid FROM trans WHERE `partition` in (%s)"
" ORDER BY tid DESC LIMIT %d,%d"
% (','.join(map(str, partition_list)), offset, length)))
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
max_tid = u64(max_tid)
r = self.query("""SELECT tid FROM trans
WHERE `partition` = %(partition)d
AND tid >= %(min_tid)d AND tid <= %(max_tid)d
ORDER BY tid ASC LIMIT %(length)d""" % {
'partition': partition,
'min_tid': min_tid,
'max_tid': max_tid,
'length': length,
})
return [p64(t[0]) for t in r]
def _updatePackFuture(self, oid, orig_serial, max_serial):
q = self.query
# Before deleting this objects revision, see if there is any
# transaction referencing its value at max_serial or above.
# If there is, copy value to the first future transaction. Any further
# reference is just updated to point to the new data location.
value_serial = None
kw = {
'partition': self._getReadablePartition(oid),
'oid': oid,
'orig_tid': orig_serial,
'max_tid': max_serial,
'new_tid': 'NULL',
}
for kw['table'] in 'obj', 'tobj':
for kw['tid'], in q('SELECT tid FROM %(table)s'
' WHERE `partition`=%(partition)d AND oid=%(oid)d'
' AND tid>=%(max_tid)d AND value_tid=%(orig_tid)d'
' ORDER BY tid ASC' % kw):
q('UPDATE %(table)s SET value_tid=%(new_tid)s'
' WHERE `partition`=%(partition)d AND oid=%(oid)d'
' AND tid=%(tid)d' % kw)
if value_serial is None:
# First found, mark its serial for future reference.
kw['new_tid'] = value_serial = kw['tid']
return value_serial
def pack(self, tid, updateObjectDataForPack):
# TODO: unit test (along with updatePackFuture)
p64 = util.p64
tid = util.u64(tid)
updatePackFuture = self._updatePackFuture
getPartition = self._getReadablePartition
q = self.query
self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid, MAX(tid)"
" FROM obj FORCE INDEX(PRIMARY)"
" WHERE tid <= %d GROUP BY oid"
% tid):
partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE `partition` = %d"
" AND oid = %d AND tid = %d AND data_id IS NULL"
% (partition, oid, max_serial)):
max_serial += 1
elif not count:
continue
# There are things to delete for this object
data_id_set = set()
sql = ' FROM obj WHERE `partition`=%d AND oid=%d' \
' AND tid<%d' % (partition, oid, max_serial)
for serial, data_id in q('SELECT tid, data_id' + sql):
data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial:
new_serial = p64(new_serial)
updateObjectDataForPack(p64(oid), p64(serial),
new_serial, data_id)
q('DELETE' + sql)
data_id_set.discard(None)
self._pruneData(data_id_set)
self.commit()
def checkTIDRange(self, partition, length, min_tid, max_tid):
count, tid_checksum, max_tid = self.query(
"""SELECT COUNT(*), SHA1(GROUP_CONCAT(tid SEPARATOR ",")), MAX(tid)
FROM (SELECT tid FROM trans
WHERE `partition` = %(partition)s
AND tid >= %(min_tid)d
AND tid <= %(max_tid)d
ORDER BY tid ASC %(limit)s) AS t""" % {
'partition': partition,
'min_tid': util.u64(min_tid),
'max_tid': util.u64(max_tid),
'limit': '' if length is None else 'LIMIT %u' % length,
})[0]
if count:
return count, a2b_hex(tid_checksum), util.p64(max_tid)
return 0, ZERO_HASH, ZERO_TID
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
u64 = util.u64
# We don't ask MySQL to compute everything (like in checkTIDRange)
# because it's difficult to get the last serial _for the last oid_.
# We would need a function (that could be named 'LAST') that returns the
# last grouped value, instead of the greatest one.
r = self.query(
"""SELECT tid, oid
FROM obj FORCE INDEX(tid)
WHERE `partition` = %(partition)s
AND tid <= %(max_tid)d
AND (tid > %(min_tid)d OR
tid = %(min_tid)d AND oid >= %(min_oid)d)
ORDER BY tid, oid %(limit)s""" % {
'min_oid': u64(min_oid),
'min_tid': u64(min_tid),
'max_tid': u64(max_tid),
'limit': '' if length is None else 'LIMIT %u' % length,
'partition': partition,
})
if r:
p64 = util.p64
return (len(r),
sha1(','.join(str(x[0]) for x in r)).digest(),
p64(r[-1][0]),
sha1(','.join(str(x[1]) for x in r)).digest(),
p64(r[-1][1]))
return 0, ZERO_HASH, ZERO_TID, ZERO_HASH, ZERO_OID
def _cmdline(self):
for x in ('u', self.user), ('p', self.passwd), ('S', self.socket):
if x[1]:
yield '-%s%s' % x
yield self.db
def dump(self):
import subprocess
cmd = ['mysqldump', '--compact', '--hex-blob']
cmd += self._cmdline()
return subprocess.check_output(cmd)
def restore(self, sql):
import subprocess
cmd = ['mysql']
cmd += self._cmdline()
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
p.communicate(sql)
retcode = p.wait()
if retcode:
raise subprocess.CalledProcessError(retcode, cmd)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/database/sqlite.py 0000664 0000000 0000000 00000067571 13313233730 0030637 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2012-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from collections import OrderedDict
import os
import sqlite3
from hashlib import sha1
import string
import traceback
from . import LOG_QUERIES
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.interfaces import implements
from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
def unique_constraint_message(table, *columns):
c = sqlite3.connect(":memory:")
values = '?' * len(columns)
insert = "INSERT INTO %s VALUES(%s)" % (table, ', '.join(values))
x = "%s (%s)" % (table, ', '.join(columns))
c.execute("CREATE TABLE " + x)
c.execute("CREATE UNIQUE INDEX i ON " + x)
try:
c.executemany(insert, (values, values))
except sqlite3.IntegrityError, e:
return e.args[0]
assert False
def retry_if_locked(f, *args):
try:
return f(*args)
except sqlite3.OperationalError as e:
x = e.args[0]
if x != 'database is locked':
raise
msg = traceback.format_exception_only(type(e), e)
msg += traceback.format_stack()
logging.warning(''.join(msg))
while 1:
try:
return f(*args)
except sqlite3.OperationalError as e:
if e.args[0] != x:
raise
@implements
class SQLiteDatabaseManager(DatabaseManager):
"""This class manages a database on SQLite.
CAUTION: Make sure we never use statement journal files, as explained at
https://www.sqlite.org/tempfiles.html for more information.
In other words, temporary files (by default in /var/tmp !) must
never be used for small requests.
"""
VERSION = 3
def _parse(self, database):
self.db = os.path.expanduser(database)
def _close(self):
self.conn.close()
def _connect(self):
logging.info('connecting to SQLite database %r', self.db)
self.conn = sqlite3.connect(self.db, check_same_thread=False)
self.lock(self.db)
if self.UNSAFE:
q = self.query
q("PRAGMA synchronous = OFF")
q("PRAGMA journal_mode = MEMORY")
self._config = {}
def _getDevPath(self):
return self.db
def _commit(self):
retry_if_locked(self.conn.commit)
if LOG_QUERIES:
def query(self, query):
printable_char_list = []
for c in query.split('\n', 1)[0][:70]:
if c not in string.printable or c in '\t\x0b\x0c\r':
c = '\\x%02x' % ord(c)
printable_char_list.append(c)
logging.debug('querying %s...', ''.join(printable_char_list))
return self.conn.execute(query)
else:
query = property(lambda self: self.conn.execute)
def erase(self):
for t in 'config', 'pt', 'trans', 'obj', 'data', 'ttrans', 'tobj':
self.query('DROP TABLE IF EXISTS ' + t)
def nonempty(self, table):
try:
return bool(self.query(
"SELECT 1 FROM %s LIMIT 1" % table).fetchone())
except sqlite3.OperationalError as e:
if not e.args[0].startswith("no such table:"):
raise
def _alterTable(self, schema_dict, table, select="*"):
# BBB: As explained in _setup, no transactional DDL
# so let's do the same dance as for MySQL.
q = self.query
new = 'new_' + table
if self.nonempty(table) is None:
if self.nonempty(new) is None:
return
else:
q("DROP TABLE IF EXISTS " + new)
q(schema_dict.pop(table) % new)
q("INSERT INTO %s SELECT %s FROM %s" % (new, select, table))
q("DROP TABLE " + table)
q("ALTER TABLE %s RENAME TO %s" % (new, table))
def _migrate1(self, *_):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'pt', "rid, nid, CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state END")
def _setup(self, dedup=False):
# BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements.
# This anti-feature causes this method to be relatively slow.
# Unit tests enables the UNSAFE boolean flag.
self._config.clear()
q = self.query
schema_dict = OrderedDict()
index_dict = {}
# The table "config" stores configuration
# parameters which affect the persistent data.
schema_dict['config'] = """CREATE TABLE %s (
name TEXT NOT NULL PRIMARY KEY,
value TEXT)
"""
# The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
nid INTEGER NOT NULL,
tid INTEGER NOT NULL,
PRIMARY KEY (partition, nid))
"""
# The table "trans" stores information on committed transactions.
schema_dict['trans'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
tid INTEGER NOT NULL,
packed BOOLEAN NOT NULL,
oids BLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid))
"""
# The table "obj" stores committed object metadata.
schema_dict['obj'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (partition, oid, tid))
"""
index_dict['obj'] = (
"CREATE INDEX %s ON %s(partition, tid, oid)",
"CREATE INDEX %s ON %s(data_id)")
# The table "data" stores object data.
schema_dict['data'] = """CREATE TABLE %s (
id INTEGER PRIMARY KEY,
hash BLOB NOT NULL,
compression INTEGER NOT NULL,
value BLOB NOT NULL)
"""
if dedup:
index_dict['data'] = (
"CREATE UNIQUE INDEX %s ON %s(hash, compression)",)
# The table "ttrans" stores information on uncommitted transactions.
schema_dict['ttrans'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
tid INTEGER,
packed BOOLEAN NOT NULL,
oids BLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL)
"""
# The table "tobj" stores uncommitted object metadata.
schema_dict['tobj'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (tid, oid))
"""
if self.nonempty('config') is None:
q(schema_dict.pop('config') % 'config')
self._setConfiguration('version', self.VERSION)
else:
self.migrate(schema_dict, index_dict)
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
for table, index in index_dict.iteritems():
for i, index in enumerate(index, 1):
q(index % ('IF NOT EXISTS _%s_i%s' % (table, i), table))
self._uncommitted_data.update(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
def getConfiguration(self, key):
try:
return self._config[key]
except KeyError:
try:
r = self.query("SELECT value FROM config WHERE name=?",
(key,)).fetchone()[0]
except TypeError:
r = None
self._config[key] = r
return r
def _setConfiguration(self, key, value):
q = self.query
self._config[key] = value
if value is None:
q("DELETE FROM config WHERE name=?", (key,))
else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
def _getLastTID(self, partition, max_tid=None):
x = self.query
if max_tid is None:
x = x("SELECT MAX(tid) FROM trans WHERE partition=?", (partition,))
else:
x = x("SELECT MAX(tid) FROM trans WHERE partition=? AND tid<=?",
(partition, max_tid))
return x.next()[0]
def _getLastIDs(self, *args):
q = self.query
(oid,), = q("SELECT MAX(oid) FROM obj WHERE `partition`=?", args)
(tid,), = q("SELECT MAX(tid) FROM obj WHERE `partition`=?", args)
return tid, oid
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48)).fetchone()[0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
for ttid, in q("SELECT DISTINCT tid FROM tobj"))
def getFinalTID(self, ttid):
ttid = util.u64(ttid)
# As of SQLite 3.8.7.1, 'tid>=ttid' would ignore the index on tid,
# even though ttid is a constant.
for tid, in self.query("SELECT tid FROM trans"
" WHERE partition=? AND tid>=? AND ttid=? LIMIT 1",
(self._getReadablePartition(ttid), ttid, ttid)):
return util.p64(tid)
def getLastObjectTID(self, oid):
oid = util.u64(oid)
r = self.query("SELECT tid FROM obj"
" WHERE partition=? AND oid=?"
" ORDER BY tid DESC LIMIT 1",
(self._getReadablePartition(oid), oid)).fetchone()
return r and util.p64(r[0])
def _getNextTID(self, *args): # partition, oid, tid
r = self.query("""SELECT tid FROM obj
WHERE partition=? AND oid=? AND tid>?
ORDER BY tid LIMIT 1""", args).fetchone()
return r and r[0]
def _getObject(self, oid, tid=None, before_tid=None):
q = self.query
partition = self._getReadablePartition(oid)
sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON obj.data_id = data.id'
' WHERE partition=? AND oid=?')
if tid is not None:
r = q(sql + ' AND tid=?', (partition, oid, tid))
elif before_tid is not None:
r = q(sql + ' AND tid ORDER BY tid DESC LIMIT 1',
(partition, oid, before_tid))
else:
r = q(sql + ' ORDER BY tid DESC LIMIT 1', (partition, oid))
try:
serial, compression, checksum, data, value_serial = r.fetchone()
except TypeError:
return None
if checksum:
checksum = str(checksum)
data = str(data)
return (serial, self._getNextTID(partition, oid, serial),
compression, checksum, data, value_serial)
def _changePartitionTable(self, cell_list, reset=False):
q = self.query
if reset:
q("DELETE FROM pt")
for offset, nid, state in cell_list:
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
# WKRD: Why does SQLite need a statement journal file
# whereas we try to replace only 1 value ?
# We don't want to remove the 'NOT NULL' constraint
# so we must simulate a "REPLACE OR FAIL".
q("DELETE FROM pt WHERE partition=? AND nid=?", (offset, nid))
if state is not None:
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
def dropPartitions(self, offset_list):
where = " WHERE partition=?"
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
% where, args)]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
def _getUnfinishedDataIdList(self):
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
" WHERE `partition` IN (%s)" % ','.join(map(str, offset_list))
q = self.query
q("DELETE FROM tobj" + where)
q("DELETE FROM ttrans" + where)
def storeTransaction(self, tid, object_list, transaction, temporary=True):
u64 = util.u64
tid = u64(tid)
T = 't' if temporary else ''
obj_sql = "INSERT OR FAIL INTO %sobj VALUES (?,?,?,?,?)" % T
q = self.query
for oid, data_id, value_serial in object_list:
oid = u64(oid)
partition = self._getPartition(oid)
if value_serial:
value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=? AND oid=? AND tid=?",
(partition, oid, value_serial))
if temporary:
self.holdData(data_id)
try:
q(obj_sql, (partition, oid, tid, data_id, value_serial))
except sqlite3.IntegrityError:
# This may happen if a previous replication of 'obj' was
# interrupted.
if not T:
r, = q("SELECT data_id, value_tid FROM obj"
" WHERE partition=? AND oid=? AND tid=?",
(partition, oid, tid))
if r == (data_id, value_serial):
continue
raise
if transaction:
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
assert packed in (0, 1)
q("INSERT OR FAIL INTO %strans VALUES (?,?,?,?,?,?,?,?)" % T,
(partition, None if temporary else tid,
packed, buffer(''.join(oid_list)),
buffer(user), buffer(desc), buffer(ext), u64(ttid)))
def getOrphanList(self):
return [x for x, in self.query(
"SELECT id FROM data LEFT JOIN obj ON (id=data_id)"
" WHERE data_id IS NULL")]
def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list:
q = self.query
data_id_list.difference_update(x for x, in q(
"SELECT DISTINCT data_id FROM obj WHERE data_id IN (%s)"
% ",".join(map(str, data_id_list))))
q("DELETE FROM data WHERE id IN (%s)"
% ",".join(map(str, data_id_list)))
return len(data_id_list)
return 0
def storeData(self, checksum, oid, data, compression,
_dup=unique_constraint_message("data", "hash", "compression")):
H = buffer(checksum)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (?,?,?,?)",
(r, H, compression, buffer(data)))
except sqlite3.IntegrityError, e:
if e.args[0] == _dup:
(r, d), = self.query("SELECT id, value FROM data"
" WHERE hash=? AND compression=?",
(H, compression))
if str(d) == data:
return r
raise
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data WHERE id=?", (data_id,)).fetchone()
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getReadablePartition(oid)
sql = 'SELECT tid, value_tid FROM obj' \
' WHERE partition=? AND oid=?'
if tid is not None:
r = self.query(sql + ' AND tid=?', (partition, oid, tid))
elif before_tid is not None:
r = self.query(sql + ' AND tid ORDER BY tid DESC LIMIT 1',
(partition, oid, before_tid))
else:
r = self.query(sql + ' ORDER BY tid DESC LIMIT 1',
(partition, oid))
r = r.fetchone()
return r or (None, None)
def lockTransaction(self, tid, ttid):
u64 = util.u64
self.query("UPDATE ttrans SET tid=? WHERE ttid=?",
(u64(tid), u64(ttid)))
self.commit()
def unlockTransaction(self, tid, ttid, trans, obj):
q = self.query
u64 = util.u64
tid = u64(tid)
if trans:
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=?", (tid,))
q("DELETE FROM ttrans WHERE tid=?", (tid,))
if not obj:
return
ttid = u64(ttid)
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql, (ttid,))]
q("INSERT INTO obj SELECT partition, oid, ?, data_id, value_tid" + sql,
(tid, ttid))
q("DELETE" + sql, (ttid,))
self.releaseData(data_id_list)
def abortTransaction(self, ttid):
args = util.u64(ttid),
q = self.query
q("DELETE FROM tobj WHERE tid=?", args)
q("DELETE FROM ttrans WHERE ttid=?", args)
def deleteTransaction(self, tid):
tid = util.u64(tid)
self.query("DELETE FROM trans WHERE partition=? AND tid=?",
(self._getPartition(tid), tid))
def deleteObject(self, oid, serial=None):
oid = util.u64(oid)
sql = " FROM obj WHERE partition=? AND oid=?"
args = [self._getPartition(oid), oid]
if serial:
sql += " AND tid=?"
args.append(util.u64(serial))
q = self.query
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE partition=?"
args = [partition]
if min_tid is not None:
sql += " AND ? < tid"
args.append(min_tid)
if max_tid is not None:
sql += " AND tid <= ?"
args.append(max_tid)
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
def getTransaction(self, tid, all=False):
tid = util.u64(tid)
q = self.query
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM trans WHERE partition=? AND tid=?",
(self._getReadablePartition(tid), tid)).fetchone()
if not r and all:
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM ttrans WHERE tid=?", (tid,)).fetchone()
if r:
oids, user, description, ext, packed, ttid = r
return splitOIDField(tid, oids), str(user), \
str(description), str(ext), packed, util.p64(ttid)
def getObjectHistory(self, oid, offset, length):
# FIXME: This method doesn't take client's current transaction id as
# parameter, which means it can return transactions in the future of
# client's transaction.
p64 = util.p64
oid = util.u64(oid)
return [(p64(tid), length or 0) for tid, length in self.query("""\
SELECT tid, LENGTH(value)
FROM obj LEFT JOIN data ON obj.data_id = data.id
WHERE partition=? AND oid=? AND tid>=?
ORDER BY tid DESC LIMIT ?,?""",
(self._getReadablePartition(oid), oid,
self._getPackTID(), offset, length))
] or None
def _fetchObject(self, oid, tid):
for serial, compression, checksum, data, value_serial in self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON obj.data_id = data.id'
' WHERE partition=? AND oid=? AND tid=?',
(self._getReadablePartition(oid), oid, tid)):
if checksum:
checksum = str(checksum)
data = str(data)
return serial, compression, checksum, data, value_serial
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
return [(p64(serial), p64(oid)) for serial, oid in self.query("""\
SELECT tid, oid FROM obj
WHERE partition=? AND tid<=?
AND (tid=? AND ?<=oid OR ?=? AND value_tid=?
ORDER BY tid ASC""" % T,
(partition, oid, max_serial, orig_serial)):
q(update, (value_serial, partition, oid, serial))
if value_serial is None:
# First found, mark its serial for future reference.
value_serial = serial
return value_serial
def pack(self, tid, updateObjectDataForPack):
# TODO: unit test (along with updatePackFuture)
p64 = util.p64
tid = util.u64(tid)
updatePackFuture = self._updatePackFuture
getPartition = self._getReadablePartition
q = self.query
self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid, MAX(tid)"
" FROM obj WHERE tid<=? GROUP BY oid",
(tid,)):
partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE partition=?"
" AND oid=? AND tid=? AND data_id IS NULL",
(partition, oid, max_serial)).fetchone():
max_serial += 1
elif not count:
continue
# There are things to delete for this object
data_id_set = set()
sql = " FROM obj WHERE partition=? AND oid=? AND tid"
args = partition, oid, max_serial
for serial, data_id in q("SELECT tid, data_id" + sql, args):
data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial:
new_serial = p64(new_serial)
updateObjectDataForPack(p64(oid), p64(serial),
new_serial, data_id)
q("DELETE" + sql, args)
data_id_set.discard(None)
self._pruneData(data_id_set)
self.commit()
def checkTIDRange(self, partition, length, min_tid, max_tid):
# XXX: SQLite's GROUP_CONCAT is slow (looks like quadratic)
count, tids, max_tid = self.query("""\
SELECT COUNT(*), GROUP_CONCAT(tid), MAX(tid)
FROM (SELECT tid FROM trans
WHERE partition=? AND ?<=tid AND tid<=?
ORDER BY tid ASC LIMIT ?) AS t""",
(partition, util.u64(min_tid), util.u64(max_tid),
-1 if length is None else length)).fetchone()
if count:
return count, sha1(tids).digest(), util.p64(max_tid)
return 0, ZERO_HASH, ZERO_TID
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
u64 = util.u64
# We don't ask SQLite to compute everything (like in checkTIDRange)
# because it's difficult to get the last serial _for the last oid_.
# We would need a function (that could be named 'LAST') that returns the
# last grouped value, instead of the greatest one.
min_tid = u64(min_tid)
r = self.query("""\
SELECT tid, oid
FROM obj
WHERE partition=? AND tid<=? AND (tid>? OR tid=? AND oid>=?)
ORDER BY tid, oid LIMIT ?""",
(partition, u64(max_tid), min_tid, min_tid, u64(min_oid),
-1 if length is None else length)).fetchall()
if r:
p64 = util.p64
return (len(r),
sha1(','.join(str(x[0]) for x in r)).digest(),
p64(r[-1][0]),
sha1(','.join(str(x[1]) for x in r)).digest(),
p64(r[-1][1]))
return 0, ZERO_HASH, ZERO_TID, ZERO_HASH, ZERO_OID
def dump(self):
main = []
data = []
for line in self.conn.iterdump():
if line.startswith('INSERT '):
assert line.endswith(';'), line
data.append(line)
continue
if line.startswith('CREATE TABLE '):
# ALTER TABLE adds quotes.
create, table, name, tail = line.split(' ', 3)
line = ' '.join((create, table, name.strip('"'), tail))
main.append(line)
assert line == 'COMMIT;', line
data.sort()
main[-1:-1] = data
return '\n'.join(main) + '\n'
def restore(self, sql):
self.conn.executescript(sql)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/ 0000775 0000000 0000000 00000000000 13313233730 0027000 5 ustar 00root root 0000000 0000000 neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/__init__.py 0000664 0000000 0000000 00000006424 13313233730 0031117 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import weakref
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, StoppedOperation
from neo.lib.protocol import (uuid_str,
NodeStates, NodeTypes, Packets, ProtocolError)
class BaseHandler(EventHandler):
def notifyTransactionFinished(self, conn, ttid, max_tid):
app = self.app
app.tm.abort(ttid)
app.replicator.transactionFinished(ttid, max_tid)
def abortTransaction(self, conn, ttid, _):
self.notifyTransactionFinished(conn, ttid, None)
class BaseMasterHandler(BaseHandler):
def connectionLost(self, conn, new_state):
if self.app.listening_conn: # if running
self.app.master_node = None
raise PrimaryFailure('connection lost')
def stopOperation(self, conn):
raise StoppedOperation
def reelectPrimary(self, conn):
raise PrimaryFailure('re-election occurs')
def notifyClusterInformation(self, conn, state):
self.app.changeClusterState(state)
def notifyNodeInformation(self, conn, timestamp, node_list):
"""Store information on nodes, only if this is sent by a primary
master node."""
super(BaseMasterHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, _, uuid, state, _ in node_list:
if uuid == self.app.uuid:
# This is me, do what the master tell me
logging.info("I was told I'm %s", state)
if state in (NodeStates.UNKNOWN, NodeStates.DOWN):
erase = state == NodeStates.UNKNOWN
self.app.shutdown(erase=erase)
elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING:
logging.info('Notified of non-running client, abort (%s)',
uuid_str(uuid))
self.app.tm.abortFor(uuid)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
def notifyRepair(self, conn, *args):
app = self.app
app.dm.repair(weakref.ref(app), *args)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/client.py 0000664 0000000 0000000 00000026714 13313233730 0030642 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.handler import DelayEvent
from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, NonReadableCell, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, NotRegisteredError
from . import BaseHandler
import time
# Log stores taking (incl. lock delays) more than this many seconds.
# Set to None to disable.
SLOW_STORE = 2
class ClientOperationHandler(BaseHandler):
def askTransactionInformation(self, conn, tid):
t = self.app.dm.getTransaction(tid)
if t is None:
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
conn.answer(p)
def getEventQueue(self):
# for read rpc
return self.app.tm.read_queue
def askObject(self, conn, oid, at, before):
app = self.app
if app.tm.loadLocked(oid):
raise DelayEvent
o = app.dm.getObject(oid, at, before)
try:
serial, next_serial, compression, checksum, data, data_serial = o
except TypeError:
p = (Errors.OidDoesNotExist if o is None else
Errors.OidNotFound)(dump(oid))
else:
if checksum is None:
checksum = ZERO_HASH
data = ''
p = Packets.AnswerObject(oid, serial, next_serial,
compression, checksum, data, data_serial)
conn.answer(p)
def askStoreTransaction(self, conn, ttid, *txn_info):
self.app.tm.register(conn, ttid)
self.app.tm.vote(ttid, txn_info)
conn.answer(Packets.AnswerStoreTransaction())
def askVoteTransaction(self, conn, ttid):
self.app.tm.vote(ttid)
conn.answer(Packets.AnswerVoteTransaction())
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, request_time):
try:
self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(err.tid))
return
except NonReadableCell:
logging.info('Ignore store of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
else:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(None))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid):
if 1 < compression:
raise ProtocolError('invalid compression value')
# register the transaction
self.app.tm.register(conn, ttid)
if data or checksum != ZERO_HASH:
# TODO: return an appropriate error packet
assert makeChecksum(data) == checksum
assert data_serial is None
else:
checksum = data = None
try:
self._askStoreObject(conn, oid, serial, compression,
checksum, data, data_serial, ttid, None)
except DelayEvent, e:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, time.time()),
*e.args)
def askRebaseTransaction(self, conn, *args):
conn.answer(Packets.AnswerRebaseTransaction(
self.app.tm.rebase(conn, *args)))
def askRebaseObject(self, conn, ttid, oid):
try:
self._askRebaseObject(conn, ttid, oid, None)
except DelayEvent, e:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askRebaseObject,
conn, (ttid, oid, time.time()), *e.args)
def _askRebaseObject(self, conn, ttid, oid, request_time):
conflict = self.app.tm.rebaseObject(ttid, oid)
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('RebaseObject delay: %.02fs', duration)
conn.answer(Packets.AnswerRebaseObject(conflict))
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition)))
def _askTIDs(self, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise ProtocolError('invalid offsets')
app = self.app
if partition == INVALID_PARTITION:
partition_list = app.pt.getAssignedPartitionList(app.uuid)
else:
partition_list = [partition]
return app.dm.getTIDList(first, last - first, partition_list)
def askTIDs(self, conn, *args):
conn.answer(Packets.AnswerTIDs(self._askTIDs(*args)))
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid)))
def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
app = self.app
findUndoTID = app.dm.findUndoTID
getObjectFromTransaction = app.tm.getObjectFromTransaction
object_tid_dict = {}
for oid in oid_list:
current_serial, undo_serial, is_current = findUndoTID(oid, ttid,
ltid, undone_tid, getObjectFromTransaction(ttid, oid))
if current_serial is None:
p = Errors.OidNotFound(dump(oid))
break
object_tid_dict[oid] = (current_serial, undo_serial, is_current)
else:
p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p)
def askObjectHistory(self, conn, oid, first, last):
if first >= last:
raise ProtocolError('invalid offsets')
app = self.app
if app.tm.loadLocked(oid):
raise DelayEvent
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
p = Errors.OidNotFound(dump(oid))
else:
p = Packets.AnswerObjectHistory(oid, history_list)
conn.answer(p)
def askCheckCurrentSerial(self, conn, ttid, oid, serial):
self.app.tm.register(conn, ttid)
try:
self._askCheckCurrentSerial(conn, ttid, oid, serial, None)
except DelayEvent, e:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askCheckCurrentSerial,
conn, (ttid, oid, serial, time.time()), *e.args)
def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time):
try:
self.app.tm.checkCurrentSerial(ttid, oid, serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
return
except NonReadableCell:
logging.info('Ignore check of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
else:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(None))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyOperationHandler(ClientOperationHandler):
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
askStoreTransaction = _readOnly
askVoteTransaction = _readOnly
askStoreObject = _readOnly
askFinalTID = _readOnly
askRebaseObject = _readOnly
askRebaseTransaction = _readOnly
# takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
# below operations: like in ClientOperationHandler but cut tid <= backup_tid
def askTransactionInformation(self, conn, tid):
backup_tid = self.app.dm.getBackupTID()
if tid > backup_tid:
conn.answer(Errors.TidNotFound(
'tids > %s are not fully fetched yet' % dump(backup_tid)))
return
super(ClientReadOnlyOperationHandler, self).askTransactionInformation(
conn, tid)
def askObject(self, conn, oid, serial, tid):
backup_tid = self.app.dm.getBackupTID()
if serial:
if serial > backup_tid:
# obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound
serial = ZERO_TID
elif tid:
tid = min(tid, add64(backup_tid, 1))
# limit "latest obj" query to tid <= backup_tid
else:
tid = add64(backup_tid, 1)
super(ClientReadOnlyOperationHandler, self).askObject(
conn, oid, serial, tid)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
backup_tid = self.app.dm.getBackupTID()
max_tid = min(max_tid, backup_tid)
# NOTE we don't need to adjust min_tid: if min_tid > max_tid
# db.getReplicationTIDList will return empty [], which is correct
super(ClientReadOnlyOperationHandler, self).askTIDsFrom(
conn, min_tid, max_tid, length, partition)
def askTIDs(self, conn, first, last, partition):
backup_tid = self.app.dm.getBackupTID()
tid_list = self._askTIDs(first, last, partition)
tid_list = filter(lambda tid: tid <= backup_tid, tid_list)
conn.answer(Packets.AnswerTIDs(tid_list))
# FIXME askObjectUndoSerial to limit tid <= backup_tid
# (askObjectUndoSerial is used in undo() but itself is read-only query)
# FIXME askObjectHistory to limit tid <= backup_tid
# TODO dm.getObjectHistory has to be first fixed for this
#def askObjectHistory(self, conn, oid, first, last):
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/identification.py 0000664 0000000 0000000 00000005734 13313233730 0032354 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.protocol import NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError
from .storage import StorageOperationHandler
from .client import ClientOperationHandler, ClientReadOnlyOperationHandler
class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """
def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification')
def getEventQueue(self):
# for requestIdentification
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
devpath, id_timestamp):
self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready
if not app.operational:
raise NotReadyError
if uuid is None:
if node_type != NodeTypes.STORAGE:
raise ProtocolError('reject anonymous non-storage node')
handler = StorageOperationHandler(self.app)
conn.setHandler(handler)
else:
if uuid == app.uuid:
raise ProtocolError("uuid conflict or loopback connection")
node = app.nm.getByUUID(uuid, id_timestamp)
# choose the handler according to the node type
if node_type == NodeTypes.CLIENT:
if app.dm.getBackupTID():
handler = ClientReadOnlyOperationHandler
else:
handler = ClientOperationHandler
assert not node.isConnected(), node
assert node.isRunning(), node
elif node_type == NodeTypes.STORAGE:
handler = StorageOperationHandler
else:
raise ProtocolError('reject non-client-or-storage node')
# apply the handler and set up the connection
handler = handler(self.app)
conn.setHandler(handler)
node.setConnection(conn, app.uuid < uuid)
# accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid))
handler.connectionCompleted(conn)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/initialization.py 0000664 0000000 0000000 00000006057 13313233730 0032411 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from . import BaseMasterHandler
from neo.lib import logging
from neo.lib.protocol import Packets, ProtocolError, ZERO_TID
class InitializationHandler(BaseMasterHandler):
def sendPartitionTable(self, conn, ptid, row_list):
app = self.app
pt = app.pt
pt.load(ptid, row_list, app.nm)
if not pt.filled():
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
cell_list = []
unassigned = range(pt.getPartitions())
for offset in reversed(unassigned):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned.remove(offset)
# delete objects database
dm = app.dm
if unassigned:
if app.disable_drop_partitions:
logging.info('partitions %r are discarded but actual deletion'
' of data is disabled', unassigned)
else:
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, cell_list, reset=True)
dm.commit()
def truncate(self, conn, tid):
dm = self.app.dm
dm._setBackupTID(None)
dm._setTruncateTID(tid)
dm.commit()
def askRecovery(self, conn):
app = self.app
conn.answer(Packets.AnswerRecovery(
app.pt.getID(),
app.dm.getBackupTID(),
app.dm.getTruncateTID()))
def askLastIDs(self, conn):
dm = self.app.dm
dm.truncate()
ltid, loid = dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(loid, ltid))
def askPartitionTable(self, conn):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
def askLockedTransactions(self, conn):
conn.answer(Packets.AnswerLockedTransactions(
self.app.dm.getUnfinishedTIDDict()))
def validateTransaction(self, conn, ttid, tid):
dm = self.app.dm
dm.lockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid, True, True)
dm.commit()
def startOperation(self, conn, backup):
# XXX: see comment in protocol
self.app.operational = True
self.app.replicator.startOperation(backup)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/master.py 0000664 0000000 0000000 00000003732 13313233730 0030652 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import Packets, ZERO_TID
from . import BaseMasterHandler
class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """
def startOperation(self, conn, backup):
# XXX: see comment in protocol
assert self.app.operational and backup
self.app.replicator.startOperation(backup)
def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid)
conn.answer(Packets.AnswerInformationLocked(ttid))
def notifyUnlockInformation(self, conn, ttid):
self.app.tm.unlock(ttid)
def askPack(self, conn, tid):
app = self.app
logging.info('Pack started, up to %s...', dump(tid))
app.dm.pack(tid, app.tm.updateObjectDataForPack)
logging.info('Pack finished.')
conn.answer(Packets.AnswerPack(True))
def answerUnfinishedTransactions(self, conn, *args, **kw):
self.app.replicator.setUnfinishedTIDList(*args, **kw)
def replicate(self, conn, tid, upstream_name, source_dict):
self.app.replicator.backup(tid, {p: a and (a, upstream_name)
for p, a in source_dict.iteritems()})
def checkPartition(self, conn, *args):
self.app.checker(*args)
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/handlers/storage.py 0000664 0000000 0000000 00000026132 13313233730 0031022 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import weakref
from functools import wraps
from neo.lib.connection import ConnectionClosed
from neo.lib.handler import DelayEvent, EventHandler
from neo.lib.protocol import Errors, Packets, ProtocolError, ZERO_HASH
def checkConnectionIsReplicatorConnection(func):
def wrapper(self, conn, *args, **kw):
if self.app.replicator.isReplicatingConnection(conn):
return func(self, conn, *args, **kw)
return wraps(func)(wrapper)
def checkFeedingConnection(check):
def decorator(func):
def wrapper(self, conn, partition, *args, **kw):
app = self.app
cell = app.pt.getCell(partition, app.uuid)
if cell is None or (cell.isOutOfDate() if check else
not cell.isReadable()):
p = Errors.CheckingError if check else Errors.ReplicationError
return conn.answer(p("partition %u not readable" % partition))
conn.asServer()
return func(self, conn, partition, *args, **kw)
return wraps(func)(wrapper)
return decorator
class StorageOperationHandler(EventHandler):
"""This class handles events for replications."""
def connectionLost(self, conn, new_state):
app = self.app
if app.operational and conn.isClient():
uuid = conn.getUUID()
if uuid:
node = app.nm.getByUUID(uuid)
else:
node = app.nm.getByAddress(conn.getAddress())
node.setUnknown()
replicator = app.replicator
if replicator.current_node is node:
replicator.abort()
app.checker.connectionLost(conn)
# Client
def connectionFailed(self, conn):
if self.app.operational:
self.app.replicator.abort()
def _acceptIdentification(self, node, *args):
self.app.replicator.connected(node)
self.app.checker.connected(node)
@checkConnectionIsReplicatorConnection
def answerFetchTransactions(self, conn, pack_tid, next_tid, tid_list):
if tid_list:
deleteTransaction = self.app.dm.deleteTransaction
for tid in tid_list:
deleteTransaction(tid)
assert not pack_tid, "TODO"
if next_tid:
self.app.replicator.fetchTransactions(next_tid)
else:
self.app.replicator.fetchObjects()
@checkConnectionIsReplicatorConnection
def addTransaction(self, conn, tid, user, desc, ext, packed, ttid,
oid_list):
# Directly store the transaction.
self.app.dm.storeTransaction(tid, (),
(oid_list, user, desc, ext, packed, ttid), False)
@checkConnectionIsReplicatorConnection
def answerFetchObjects(self, conn, pack_tid, next_tid,
next_oid, object_dict):
if object_dict:
deleteObject = self.app.dm.deleteObject
for serial, oid_list in object_dict.iteritems():
for oid in oid_list:
deleteObject(oid, serial)
assert not pack_tid, "TODO"
if next_tid:
# TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
# This will also commit.
self.app.replicator.finish()
@checkConnectionIsReplicatorConnection
def addObject(self, conn, oid, serial, compression,
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, oid, data, compression)
else:
data_id = None
# Directly store the transaction.
obj = oid, data_id, data_serial
dm.storeTransaction(serial, (obj,), None, False)
@checkConnectionIsReplicatorConnection
def replicationError(self, conn, message):
self.app.replicator.abort('source message: ' + message)
def checkingError(self, conn, message):
try:
self.app.checker.connectionLost(conn)
finally:
self.app.closeClient(conn)
@property
def answerCheckTIDRange(self):
return self.app.checker.checkRange
@property
def answerCheckSerialRange(self):
return self.app.checker.checkRange
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
#
# These are all low-priority packets, in that we don't want to delay
# answers to clients, so tasks are used to postpone work when we're idle.
def getEventQueue(self):
return self.app.tm.read_queue
@checkFeedingConnection(check=True)
def askCheckTIDRange(self, conn, *args):
app = self.app
if app.tm.isLockedTid(args[3]): # max_tid
raise DelayEvent
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
def check():
r = app.dm.checkTIDRange(*args)
try:
conn.send(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed):
pass
# Splitting this task would cause useless overhead. However, a
# generator function is expected, hence the following fake yield
# so that iteration stops immediately.
return; yield
app.newTask(check())
@checkFeedingConnection(check=True)
def askCheckSerialRange(self, conn, *args):
app = self.app
if app.tm.isLockedTid(args[3]): # max_tid
raise ProtocolError("transactions must be checked before objects")
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
def check():
r = app.dm.checkSerialRange(*args)
try:
conn.send(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed):
pass
return; yield # same as in askCheckTIDRange
app.newTask(check())
@checkFeedingConnection(check=False)
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list):
app = self.app
if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in
# ttrans/ttobj so wait a little.
# This can also happen for internal replication, when
# NotifyTransactionFinished(M->S) + AskFetchTransactions(S->S)
# is faster than
# NotifyUnlockInformation(M->S)
raise DelayEvent
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
peer_tid_set = set(tid_list)
dm = app.dm
tid_list = dm.getReplicationTIDList(min_tid, max_tid, length + 1,
partition)
next_tid = tid_list.pop() if length < len(tid_list) else None
def push():
try:
pack_tid = None # TODO
for tid in tid_list:
if tid in peer_tid_set:
peer_tid_set.remove(tid)
else:
t = dm.getTransaction(tid)
if t is None:
conn.send(Errors.ReplicationError(
"partition %u dropped"
% partition), msg_id)
return
oid_list, user, desc, ext, packed, ttid = t
# Sending such packet does not mark the connection
# for writing if there's too little data in the buffer.
conn.send(Packets.AddTransaction(tid, user,
desc, ext, packed, ttid, oid_list), msg_id)
# To avoid delaying several connections simultaneously,
# and also prevent the backend from scanning different
# parts of the DB at the same time, we ask the
# scheduler not to switch to another background task.
# Ideally, we are filling a buffer while the kernel
# is flushing another one for a concurrent connection.
yield conn.buffering
conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id)
yield
except (weakref.ReferenceError, ConnectionClosed):
pass
app.newTask(push())
@checkFeedingConnection(check=False)
def askFetchObjects(self, conn, partition, length, min_tid, max_tid,
min_oid, object_dict):
app = self.app
if app.tm.isLockedTid(max_tid):
raise ProtocolError("transactions must be fetched before objects")
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
dm = app.dm
object_list = dm.getReplicationObjectList(min_tid, max_tid, length + 1,
partition, min_oid)
if length < len(object_list):
next_tid, next_oid = object_list.pop()
else:
next_tid = next_oid = None
def push():
try:
pack_tid = None # TODO
for serial, oid in object_list:
oid_set = object_dict.get(serial)
if oid_set:
if type(oid_set) is list:
object_dict[serial] = oid_set = set(oid_set)
if oid in oid_set:
oid_set.remove(oid)
if not oid_set:
del object_dict[serial]
continue
object = dm.fetchObject(oid, serial)
if not object:
conn.send(Errors.ReplicationError(
"partition %u dropped or truncated"
% partition), msg_id)
return
if not object[2]: # creation undone
object = object[0], 0, ZERO_HASH, '', object[4]
# Same as in askFetchTransactions.
conn.send(Packets.AddObject(oid, *object), msg_id)
yield conn.buffering
conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id)
yield
except (weakref.ReferenceError, ConnectionClosed):
pass
app.newTask(push())
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/replicator.py 0000664 0000000 0000000 00000050376 13313233730 0027731 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
Replication algorithm
Purpose: replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:
- A new storage is added to en existing cluster.
- A node was separated from cluster and rejoins it.
- In a backup cluster, the master notifies a node that new data exists upstream
(note that in this case, the cell is always marked as UP_TO_DATE).
Replication happens per partition. Reference node can change between
partitions.
2 parts, done sequentially:
- Transaction (metadata) replication
- Object (metadata+data) replication
Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items
(transaction or object).
- For every chunk, the requesting node sends to seeding node the list of items
it already has.
- Before answering, the seeding node sends 1 packet for every missing item.
For items that are already on the replicating node, there is no check that
values matches.
- The seeding node finally answers with the list of items to delete (usually
empty).
Internal replication, which is similar to RAID1 (and as opposed to asynchronous
replication to a backup cluster) requires extra care with respect to
transactions. The transition of a cell from OUT_OF_DATE to UP_TO_DATE is done
is several steps.
A replicating node can not depend on other nodes to fetch the data
recently/being committed because that can not be done atomically: it could miss
writes between the processing of its request by a source node and the reception
of the answer.
Therefore, outdated cells are writable: a storage node asks the master for
transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer.
Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request
to the master: for this, the storage must have announced it is ready, and it
must delay identification of unknown clients (those for which it hasn't
received yet a notification from the master).
- Writes must be accepted blindly (i.e. without taking a write-lock) when a
storage node lacks the data to check for conflicts. This is possible because
1 up-to-date cell (for each partition) is enough to do these checks.
- Because the client can not reliably know if a storage node is expected to
receive a transaction in full, all writes must succeed.
- Even if the replication is finished, we have to wait that we don't have any
lockless writes left before announcing to the master that we're up-to-date.
To sum up:
1. ask unfinished transactions -> (last_transaction, ttid_list)
2. replicate to last_transaction
3. wait for all ttid_list to be finished -> new last_transaction
4. replicate to last_transaction
5. no lockless write anymore, except to (oid, ttid) that were already
stored/checked without taking a lock
6. wait for all transactions with lockless writes to be finished
7. announce we're up-to-date
For any failed write, the client marks the storage node as failed and stops
writing to it for the transaction. Unless there's no failed write, vote ends
with an extra request to the master: the transaction will only succeed if the
failed nodes can be disconnected, forcing them to replicate the missing data.
TODO: Packing and replication currently fail when they happen at the same time.
"""
import random
from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump, p64
from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000
class Partition(object):
__slots__ = 'next_trans', 'next_obj', 'max_ttid'
def __repr__(self):
return '<%s(%s) at 0x%x>' % (self.__class__.__name__,
', '.join('%s=%r' % (x, getattr(self, x)) for x in self.__slots__
if hasattr(self, x)),
id(self))
class Replicator(object):
# When the replication of a partition is aborted, the connection to the
# feeding node may still be open, e.g. on PT update from the master. In
# such case, replication is also aborted on the other side but there may
# be a few incoming packets that must be discarded.
_conn_msg_id = None
current_node = None
current_partition = None
def __init__(self, app):
self.app = app
def getCurrentConnection(self):
node = self.current_node
if node is not None and node.isConnected(True):
return node.getConnection()
def isReplicatingConnection(self, conn):
return conn is self.getCurrentConnection() and \
conn.getPeerId() == self._conn_msg_id
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler."""
assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
if ttid_list:
self.ttid_set.update(ttid_list)
max_ttid = max(ttid_list)
else:
max_ttid = None
for offset in offset_list:
self.partition_dict[offset].max_ttid = max_ttid
self.replicate_dict[offset] = max_tid
self._nextPartition()
def transactionFinished(self, ttid, max_tid=None):
""" Callback from MasterOperationHandler """
try:
self.ttid_set.remove(ttid)
except KeyError:
assert max_tid is None, max_tid
return
min_ttid = min(self.ttid_set) if self.ttid_set else INVALID_TID
for offset, p in self.partition_dict.iteritems():
if p.max_ttid:
if max_tid:
# Filling replicate_dict while there are still unfinished
# transactions for this partition is not the most
# efficient (due to the overhead of potentially replicating
# the last transactions in several times), but that's a
# simple way to make sure it is filled even if the
# remaining unfinished transactions are aborted.
self.replicate_dict[offset] = max_tid
if p.max_ttid < min_ttid:
# no more unfinished transaction for this partition
if not (offset == self.current_partition
or offset in self.replicate_dict):
logging.debug(
"All unfinished transactions have been aborted."
" Mark partition %u as already fully replicated",
offset)
# We don't have anymore the previous value of
# self.replicate_dict[offset], but p.max_ttid is not
# wrong. Anyway here, we're not in backup mode and this
# value will be ignored.
# XXX: see NonReadableCell.__doc__
self.app.tm.replicated(offset, p.max_ttid)
p.max_ttid = None
self._nextPartition()
def getBackupTID(self):
outdated_set = set(self.app.pt.getOutdatedOffsetListFor(self.app.uuid))
tid = INVALID_TID
for offset, p in self.partition_dict.iteritems():
if offset not in outdated_set:
tid = min(tid, p.next_trans, p.next_obj)
if ZERO_TID != tid != INVALID_TID:
return add64(tid, -1)
return ZERO_TID
def updateBackupTID(self, commit=False):
dm = self.app.dm
tid = dm.getBackupTID()
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
dm._setBackupTID(new_tid)
if commit:
dm.commit()
def startOperation(self, backup):
dm = self.app.dm
if backup:
if dm.getBackupTID():
assert not hasattr(self, 'partition_dict'), self.partition_dict
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
try:
partition_dict = self.partition_dict
except AttributeError:
return
for offset, next_tid in dm.iterCellNextTIDs():
if type(next_tid) is not bytes: # readable
p = partition_dict[offset]
p.next_trans, p.next_obj = next_tid
def populate(self):
self.partition_dict = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
outdated_list = []
for offset, next_tid in self.app.dm.iterCellNextTIDs():
self.partition_dict[offset] = p = Partition()
if type(next_tid) is bytes: # OUT_OF_DATE
outdated_list.append(offset)
p.next_trans = p.next_obj = next_tid
p.max_ttid = INVALID_TID
else: # readable
p.next_trans, p.next_obj = next_tid or (None, None)
p.max_ttid = None
if outdated_list:
self.app.tm.replicating(outdated_list)
def notifyPartitionChanges(self, cell_list):
"""This is a callback from MasterOperationHandler."""
abort = False
added_list = []
discarded_list = []
readable_list = []
app = self.app
for offset, uuid, state in cell_list:
if uuid == app.uuid:
if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
try:
del self.partition_dict[offset]
except KeyError:
continue
self.replicate_dict.pop(offset, None)
self.source_dict.pop(offset, None)
abort = abort or self.current_partition == offset
discarded_list.append(offset)
elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition()
# New cell. 0 is also what should be stored by the backend.
# Nothing to optimize.
p.next_trans = p.next_obj = ZERO_TID
p.max_ttid = INVALID_TID
added_list.append(offset)
else:
assert state in (CellStates.UP_TO_DATE,
CellStates.FEEDING), state
readable_list.append(offset)
tm = app.tm
if added_list:
tm.replicating(added_list)
if discarded_list:
tm.discarded(discarded_list)
if readable_list:
tm.readable(readable_list)
if abort:
self.abort()
def backup(self, tid, source_dict):
next_tid = None
for offset, source in source_dict.iteritems():
if source:
self.source_dict[offset] = source
self.replicate_dict[offset] = tid
elif offset != self.current_partition and \
offset not in self.replicate_dict:
# The master did its best to avoid useless replication orders
# but there may still be a few, and we may receive redundant
# update notification of backup_tid.
# So, we do nothing here if we are already replicating.
p = self.partition_dict[offset]
if not next_tid:
next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid
if next_tid:
self.updateBackupTID(True)
self._nextPartition()
def _nextPartitionSortKey(self, offset):
p = self.partition_dict[offset]
return p.next_obj, bool(p.max_ttid)
def _nextPartition(self):
# XXX: One connection to another storage may remain open forever.
# All other previous connections are automatically closed
# after some time of inactivity.
# This should be improved in several ways:
# - Keeping connections open between 2 clusters (backup case) is
# quite a good thing because establishing a connection costs
# time/bandwidth and replication is actually never finished.
# - When all storages of a non-backup cluster are up-to-date,
# there's no reason to keep any connection open.
if self.current_partition is not None or not self.replicate_dict:
return
app = self.app
assert app.master_conn and app.operational, (
app.master_conn, app.operational)
# Start replicating the partition which is furthest behind,
# to increase the overall backup_tid as soon as possible.
# Then prefer a partition with no unfinished transaction.
# XXX: When leaving backup mode, we should only consider UP_TO_DATE
# cells.
offset = min(self.replicate_dict, key=self._nextPartitionSortKey)
try:
addr, name = self.source_dict[offset]
except KeyError:
assert app.pt.getCell(offset, app.uuid).isOutOfDate(), (
offset, app.pt.getCell(offset, app.uuid).getState())
node = random.choice([cell.getNode()
for cell in app.pt.getCellList(offset, readable=True)
if cell.getNodeState() == NodeStates.RUNNING])
name = None
else:
node = app.nm.getByAddress(addr)
if node is None:
assert name, addr
node = app.nm.createStorage(address=addr)
self.current_partition = offset
previous_node = self.current_node
self.current_node = node
if node.isConnected(connecting=True):
if node.isIdentified():
node.getConnection().asClient()
self.fetchTransactions()
else:
assert name or node.getUUID() != app.uuid, "loopback connection"
conn = ClientConnection(app, StorageOperationHandler(app), node)
try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name,
(), app.id_timestamp))
except ConnectionClosed:
if previous_node is self.current_node:
return
if previous_node is not None and previous_node.isConnected():
app.closeClient(previous_node.getConnection())
def connected(self, node):
if self.current_node is node and self.current_partition is not None:
self.fetchTransactions()
def fetchTransactions(self, min_tid=None):
assert self.current_node.getConnection().isClient(), self.current_node
offset = self.current_partition
p = self.partition_dict[offset]
if min_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
p.next_trans = min_tid
else:
try:
addr, name = self.source_dict[offset]
except KeyError:
pass
else:
if addr != self.current_node.getAddress():
return self.abort()
min_tid = p.next_trans
self.replicate_tid = self.replicate_dict.pop(offset)
logging.debug("starting replication of from %r", offset, dump(min_tid),
dump(self.replicate_tid), self.current_node)
max_tid = self.replicate_tid
tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid,
FETCH_COUNT, offset)
self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions(
offset, FETCH_COUNT, min_tid, max_tid, tid_list))
def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
offset = self.current_partition
p = self.partition_dict[offset]
max_tid = self.replicate_tid
dm = self.app.dm
if min_tid:
p.next_obj = min_tid
self.updateBackupTID()
dm.updateCellTID(offset, add64(min_tid, -1))
dm.commit() # like in fetchTransactions
else:
min_tid = p.next_obj
p.next_trans = add64(max_tid, 1)
object_dict = {}
for serial, oid in dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid):
try:
object_dict[serial].append(oid)
except KeyError:
object_dict[serial] = [oid]
self._conn_msg_id = self.current_node.ask(Packets.AskFetchObjects(
offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict))
def finish(self):
offset = self.current_partition
tid = self.replicate_tid
del self.current_partition, self._conn_msg_id, self.replicate_tid
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
self.updateBackupTID()
app = self.app
app.dm.updateCellTID(offset, tid)
app.dm.commit()
if p.max_ttid or offset in self.replicate_dict and \
offset not in self.source_dict:
logging.debug("unfinished transactions: %r", self.ttid_set)
else:
app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
self._nextPartition()
def abort(self, message=''):
offset = self.current_partition
if offset is None:
return
del self.current_partition
self._conn_msg_id = None
logging.warning('replication aborted for partition %u%s',
offset, message and ' (%s)' % message)
if offset in self.partition_dict:
# XXX: Try another partition if possible, to increase probability to
# connect to another node. It would be better to explicitly
# search for another node instead.
tid = self.replicate_dict.pop(offset, None) or self.replicate_tid
if self.replicate_dict:
self._nextPartition()
self.replicate_dict[offset] = tid
else:
self.replicate_dict[offset] = tid
self._nextPartition()
else: # partition removed
self._nextPartition()
def stop(self):
# Close any open connection to an upstream storage,
# possibly aborting current replication.
node = self.current_node
if node is not None is node.getUUID():
offset = self.current_partition
if offset is not None:
logging.info('cancel replication of partition %u', offset)
del self.current_partition
if self._conn_msg_id is not None:
self.replicate_dict.setdefault(offset, self.replicate_tid)
del self._conn_msg_id, self.replicate_tid
self.getCurrentConnection().close()
# Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, (None, None))
if name:
tid = self.replicate_dict.pop(offset)
logging.info('cancel replication of partition %u from %r'
' up to %s', offset, addr, dump(tid))
# Make UP_TO_DATE cells really UP_TO_DATE
self._nextPartition()
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/shared_queue.py 0000664 0000000 0000000 00000014007 13313233730 0030226 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from msgpack import Packer, Unpacker
class Queue(object):
"""Unidirectional pipe for asynchronous and fast exchange of big amounts
of data between 2 processes.
It is implemented using shared memory, a few locks and msgpack
serialization. While the latter is faster than C pickle, it was mainly
chosen for its streaming API while deserializing, which greatly reduces
the locking overhead for the consumer process.
There is no mechanism to end a communication, so this information must be
in the exchanged data, for example by choosing a marker object like None:
- the last object sent by the producer is this marker
- the consumer stops iterating when it gets this marker
As long as there are data being exchanged, the 2 processes can't change
roles (producer/consumer).
"""
def __init__(self, max_size):
from multiprocessing import Lock, RawArray, RawValue
self._max_size = max_size
self._array = RawArray('c', max_size)
self._pos = RawValue('L')
self._size = RawValue('L')
self._locks = Lock(), Lock(), Lock()
def __repr__(self):
return "<%s pos=%s size=%s max_size=%s>" % (self.__class__.__name__,
self._pos.value, self._size.value, self._max_size)
def __iter__(self):
"""Iterate endlessly over all objects sent by the producer
Internally, this method uses a receiving buffer that is lost if
interrupted (GeneratorExit). If this buffer was not empty, the queue
is left in a inconsistent state and this method can't be called again.
So the correct way to split a loop is to first get an iterator
explicitly:
iq = iter(queue)
for x in iq:
if ...:
break
for x in iq:
...
"""
unpacker = Unpacker(use_list=False, raw=True)
feed = unpacker.feed
max_size = self._max_size
array = self._array
pos = self._pos
size = self._size
lock, get_lock, put_lock = self._locks
left = 0
while 1:
for data in unpacker:
yield data
while 1:
with lock:
p = pos.value
s = size.value
if s:
break
get_lock.acquire()
e = p + s
if e < max_size:
feed(array[p:e])
else:
feed(array[p:])
e -= max_size
feed(array[:e])
with lock:
pos.value = e
n = size.value
size.value = n - s
if n == max_size:
put_lock.acquire(0)
put_lock.release()
def __call__(self, iterable):
"""Fill the queue with given objects
Hoping than msgpack.Packer gets a streaming API, 'iterable' should not
be split (i.e. this method should be called only once, like __iter__).
"""
pack = Packer(use_bin_type=True).pack
max_size = self._max_size
array = self._array
pos = self._pos
size = self._size
lock, get_lock, put_lock = self._locks
left = 0
for data in iterable:
data = pack(data)
n = len(data)
i = 0
while 1:
if not left:
while 1:
with lock:
p = pos.value
j = size.value
left = max_size - j
if left:
break
put_lock.acquire()
p += j
if p >= max_size:
p -= max_size
e = min(p + min(n, left), max_size)
j = e - p
array[p:e] = data[i:i+j]
n -= j
i += j
with lock:
p = pos.value
s = size.value
j += s
size.value = j
if not s:
get_lock.acquire(0)
get_lock.release()
p += j
if p >= max_size:
p -= max_size
left = max_size - j
if not n:
break
def test(self):
import multiprocessing, random, sys, threading
from traceback import print_tb
r = range(50)
random.shuffle(r)
for P in threading.Thread, multiprocessing.Process:
q = Queue(23)
def t():
for n in xrange(len(r)):
yield '.' * n
yield
for n in r:
yield '.' * n
i = j = 0
p = P(target=q, args=(t(),))
p.daemon = 1
p.start()
try:
q = iter(q)
for i, x in enumerate(q):
if x is None:
break
self.assertEqual(x, '.' * i)
self.assertEqual(i, len(r))
for j in r:
self.assertEqual(next(q), '.' * j)
except KeyboardInterrupt:
print_tb(sys.exc_info()[2])
self.fail((i, j))
p.join()
if __name__ == '__main__':
import unittest
unittest.TextTestRunner().run(type('', (unittest.TestCase,), {
'runTest': test})())
neoppod-97af23cc3740cc4b2eee09125c9ce95abb6fbe2c-neo-storage/neo/storage/transactions.py 0000664 0000000 0000000 00000064465 13313233730 0030301 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2010-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from time import time
from neo.lib import logging
from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, NonReadableCell, \
uuid_str, MAX_TID
class ConflictError(Exception):
"""
Raised when a resolvable conflict occurs
Argument: tid of locking transaction or latest revision
"""
def __init__(self, tid):
Exception.__init__(self)
self.tid = tid
class NotRegisteredError(Exception):
"""
Raised when a ttid is not registered
"""
class Transaction(object):
"""
Container for a pending transaction
"""
_delayed = {}
tid = None
voted = 0
def __init__(self, uuid, ttid):
self._birth = time()
self.locking_tid = ttid
self.uuid = uuid
self.serial_dict = {}
self.store_dict = {}
# We must distinguish lockless stores from deadlocks.
self.lockless = set()
def __repr__(self):
return "<%s(%s, locking_tid=%s, tid=%s, age=%.2fs) at 0x%x>" % (
self.__class__.__name__,
uuid_str(self.uuid),
dump(self.locking_tid),
dump(self.tid),
time() - self._birth,
id(self))
def __lt__(self, other):
return self.locking_tid < other.locking_tid
def logDelay(self, ttid, locked, oid_serial):
if self._delayed.get(oid_serial) != locked:
if self._delayed:
self._delayed[oid_serial] = locked
else:
self._delayed = {oid_serial: locked}
logging.info('Lock delayed for %s:%s by %s',
dump(oid_serial[0]), dump(ttid), dump(locked))
def store(self, oid, data_id, value_serial):
"""
Add an object to the transaction
"""
self.store_dict[oid] = oid, data_id, value_serial
class TransactionManager(EventQueue):
"""
Manage pending transaction and locks
"""
def __init__(self, app):
EventQueue.__init__(self)
self.read_queue = EventQueue()
self._app = app
self._transaction_dict = {}
self._store_lock_dict = {}
self._load_lock_dict = {}
self._replicated = {}
self._replicating = set()
from neo.lib.util import u64
np = app.pt.getPartitions()
self.getPartition = lambda oid: u64(oid) % np
def discarded(self, offset_list):
self._replicating.difference_update(offset_list)
for offset in offset_list:
self._replicated.pop(offset, None)
getPartition = self.getPartition
for oid_dict in self._load_lock_dict, self._store_lock_dict:
for oid in oid_dict.keys():
if getPartition(oid) in offset_list:
del oid_dict[oid]
data_id_list = []
for transaction in self._transaction_dict.itervalues():
serial_dict = transaction.serial_dict
oid_list = [oid for oid in serial_dict
if getPartition(oid) in offset_list]
for oid in oid_list:
del serial_dict[oid]
try:
data_id_list.append(transaction.store_dict.pop(oid)[1])
except KeyError:
pass
transaction.lockless.difference_update(oid_list)
self._app.dm.dropPartitionsTemporary(offset_list)
self._app.dm.releaseData(data_id_list, True)
# notifyPartitionChanges will commit
self.executeQueuedEvents()
self.read_queue.executeQueuedEvents()
def readable(self, offset_list):
for offset in offset_list:
tid = self._replicated.pop(offset, None)
assert tid is None, (offset, tid)
def replicating(self, offset_list):
self._replicating.update(offset_list)
isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(self._replicated), (offset_list, self._replicated)
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
p = Packets.AskUnfinishedTransactions(offset_list)
self._app.master_conn.ask(p, offset_list=offset_list)
def replicated(self, partition, tid):
# also called for readable cells in BACKINGUP state
self._replicating.discard(partition)
self._replicated[partition] = tid
self._notifyReplicated()
def _notifyReplicated(self):
getPartition = self.getPartition
store_lock_dict = self._store_lock_dict
replicated = self._replicated
notify = {x[0] for x in replicated.iteritems() if x[1]}
# We sort transactions so that in case of muliple stores/checks for the
# same oid, the lock is taken by the highest locking ttid, which will
# delay new transactions.
for txn, ttid in sorted((txn, ttid) for ttid, txn in
self._transaction_dict.iteritems()):
if txn.locking_tid == MAX_TID:
break # all remaining transactions are resolving a deadlock
assert txn.lockless.issubset(txn.serial_dict), (
txn.lockless, txn.serial_dict)
for oid in txn.lockless:
partition = getPartition(oid)
if replicated.get(partition):
if store_lock_dict.get(oid, ttid) != ttid:
# We have a "multi-lock" store, i.e. an
# initially-lockless store to a partition that became
# replicated.
notify.discard(partition)
store_lock_dict[oid] = ttid
if notify:
# For these partitions, all oids of all pending transactions
# are now locked normally and we don't rely anymore on other
# readable cells to check locks: we're really up-to-date.
for partition in notify:
self._app.master_conn.send(Packets.NotifyReplicationDone(
partition, replicated[partition]))
replicated[partition] = None
for oid, ttid in store_lock_dict.iteritems():
if getPartition(oid) in notify:
# Use 'discard' instead of 'remove', for oids that were
# locked after that the partition was replicated.
self._transaction_dict[ttid].lockless.discard(oid)
def register(self, conn, ttid):
"""
Register a transaction, it may be already registered
"""
if ttid not in self._transaction_dict:
uuid = conn.getUUID()
logging.debug('Register TXN %s for %s', dump(ttid), uuid_str(uuid))
self._transaction_dict[ttid] = Transaction(uuid, ttid)
def getObjectFromTransaction(self, ttid, oid):
"""
Return object data for given running transaction.
Return None if not found.
"""
try:
return self._transaction_dict[ttid].store_dict[oid]
except KeyError:
return None
def _rebase(self, transaction, ttid, locking_tid=MAX_TID):
# With the default value of locking_tid, this marks the transaction as
# being rebased, in case that the current lock is released (the other
# transaction is aborted or committed) before the client sends us a new
# locking tid: in lockObject, 'locked' will be None but we'll still
# have to delay the store.
transaction.locking_tid = locking_tid
if ttid:
# Remove store locks we have.
# In order to keep all locking data consistent, this must be done
# when the locking tid changes, i.e. from both 'lockObject' (for
# the node that triggered the deadlock) and 'rebase' (for other
# nodes).
for oid, locked in self._store_lock_dict.items():
# If this oid is locked by several transactions (all lockless),
# the following condition is true if we have the highest ttid,
# but in either case, _notifyReplicated will be called below,
# fixing the store lock.
if locked == ttid:
del self._store_lock_dict[oid]
lockless = transaction.lockless
# There's nothing to rebase for lockless stores to replicating
# partitions because there's no lock taken yet. In other words,
# rebasing such stores would do nothing. Other lockless stores
# become normal ones: this transaction does not block anymore
# replicated partitions from being marked as UP_TO_DATE.
oid = [oid
for oid in lockless.intersection(transaction.serial_dict)
if self.getPartition(oid) not in self._replicating]
if oid:
lockless.difference_update(oid)
self._notifyReplicated()
# Some locks were released, some pending locks may now succeed.
# We may even have delayed stores for this transaction, like the one
# that triggered the deadlock. They must also be sorted again because
# our locking tid has changed.
self.sortAndExecuteQueuedEvents()
def rebase(self, conn, ttid, locking_tid):
self.register(conn, ttid)
transaction = self._transaction_dict[ttid]
if transaction.voted:
raise ProtocolError("TXN %s already voted" % dump(ttid))
# First, get a set copy of serial_dict before _rebase locks oids.
lock_set = set(transaction.serial_dict)
self._rebase(transaction, transaction.locking_tid != MAX_TID and ttid,
locking_tid)
if transaction.locking_tid == MAX_TID:
# New deadlock. There's no point rebasing objects now.
return ()
# We return all oids that can't be relocked trivially
# (the client will use RebaseObject for these oids).
lock_set -= transaction.lockless # see comment in _rebase
recheck_set = lock_set.intersection(self._store_lock_dict)
lock_set -= recheck_set
for oid in lock_set:
try:
serial = transaction.serial_dict[oid]
except KeyError:
# An oid was already being rebased and delayed,
# and it got a conflict during the above call to _rebase.
continue
try:
self.lockObject(ttid, serial, oid)
except ConflictError:
recheck_set.add(oid)
return recheck_set
def vote(self, ttid, txn_info=None):
"""
Store transaction information received from client node
"""
logging.debug('Vote TXN %s', dump(ttid))
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid))
object_list = transaction.store_dict.itervalues()
if txn_info:
user, desc, ext, oid_list = txn_info
txn_info = oid_list, user, desc, ext, False, ttid
transaction.voted = 2
else:
transaction.voted = 1
# store metadata to temporary table
dm = self._app.dm
dm.storeTransaction(ttid, object_list, txn_info)
dm.commit()
def lock(self, ttid, tid):
"""
Lock a transaction
"""
logging.debug('Lock TXN %s (ttid=%s)', dump(tid), dump(ttid))
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid))
assert transaction.tid is None, dump(transaction.tid)
assert ttid <= tid, (ttid, tid)
transaction.tid = tid
self._load_lock_dict.update(
dict.fromkeys(transaction.store_dict, ttid))
if transaction.voted == 2:
self._app.dm.lockTransaction(tid, ttid)
def unlock(self, ttid):
"""
Unlock transaction
"""
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid))
tid = transaction.tid
logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid))
dm = self._app.dm
dm.unlockTransaction(tid, ttid,
transaction.voted == 2,
transaction.store_dict)
self._app.em.setTimeout(time() + 1, dm.deferCommit())
self.abort(ttid, even_if_locked=True)
def getFinalTID(self, ttid):
try:
return self._transaction_dict[ttid].tid
except KeyError:
return self._app.dm.getFinalTID(ttid)
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
def lockObject(self, ttid, serial, oid):
"""
Take a write lock on given object, checking that "serial" is
current.
Raises:
DelayEvent
ConflictError
"""
transaction = self._transaction_dict[ttid]
if self.getPartition(oid) in self._replicating:
# We're out-of-date so maybe:
# - we don't have all data to check for conflicts
# - we missed stores/check that would lock this one
# However, this transaction may have begun after we started to
# replicate, and we're expected to store it in full.
# Since there's at least 1 other (readable) cell that will do this
# check, we accept this store/check without taking a lock.
transaction.lockless.add(oid)
return
locked = self._store_lock_dict.get(oid)
if locked:
other = self._transaction_dict[locked]
if other < transaction or other.voted:
# We have a bigger "TTID" than locking transaction, so we are
# younger: enter waiting queue so we are handled when lock gets
# released. We also want to delay (instead of conflict) if the
# client is so faster that it is committing another transaction
# before we processed UnlockInformation from the master.
# Or the locking transaction has already voted and there's no
# risk of deadlock if we delay.
transaction.logDelay(ttid, locked, (oid, serial))
# A client may have several stores delayed for the same oid
# but this is not a problem. EventQueue processes them in order
# and only the last one will not result in conflicts (that are
# already resolved).
raise DelayEvent(transaction)
if oid in transaction.lockless:
# This is a consequence of not having taken a lock during
# replication. After a ConflictError, we may be asked to "lock"
# it again. The current lock is a special one that only delays
# new transactions.
# For the cluster, we're still out-of-date and like above,
# at least 1 other (readable) cell checks for conflicts.
return
if other is not transaction:
# We have a smaller "TTID" than locking transaction, so we are
# older: this is a possible deadlock case, as we might already
# hold locks the younger transaction is waiting upon.
logging.info('Deadlock on %s:%s with %s',
dump(oid), dump(ttid), dump(locked))
# Ask master to give the client a new locking tid, which will
# be used to ask all involved storage nodes to rebase the
# already locked oids for this transaction.
self._app.master_conn.send(Packets.NotifyDeadlock(
ttid, transaction.locking_tid))
self._rebase(transaction, ttid)
raise DelayEvent(transaction)
# If previous store was an undo, next store must be based on
# undo target.
try:
previous_serial = transaction.store_dict[oid][2]
except KeyError:
# Similarly to below for store, cascaded deadlock for
# checkCurrentSerial is possible because rebase() may return
# oids for which previous rebaseObject are delayed, or being
# received, and the client will bindly resend them.
assert oid in transaction.serial_dict, transaction
logging.info('Transaction %s checking %s more than once',
dump(ttid), dump(oid))
return
if previous_serial is None:
# 2 valid cases:
# - the previous undo resulted in a resolved conflict
# - cascaded deadlock resolution
# Otherwise, this should not happen. For example, when being
# disconnected by the master because we missed a transaction,
# a conflict may happen after a first store to us, but the
# resolution waits for invalidations from the master (to then
# load the saved data), which are sent after the notification
# we are down, and the client would stop writing to us.
logging.info('Transaction %s storing %s more than once',
dump(ttid), dump(oid))
return
elif transaction.locking_tid == MAX_TID:
# Deadlock avoidance. Still no new locking_tid from the client.
raise DelayEvent(transaction)
else:
try:
previous_serial = self._app.dm.getLastObjectTID(oid)
except NonReadableCell:
partition = self.getPartition(oid)
if partition not in self._replicated:
# Either the partition is discarded or we haven't yet
# received the notification from the master that the
# partition is assigned to us. In the latter case, we're
# not expected to have the partition in full.
# We'll return a successful answer to the client, which
# is fine because there's at least one other cell that is
# readable for this oid.
raise
with self._app.dm.replicated(partition):
previous_serial = self._app.dm.getLastObjectTID(oid)
# Locking before reporting a conflict would speed up the case of
# cascading conflict resolution by avoiding incremental resolution,
# assuming that the time to resolve a conflict is often constant:
# "C+A vs. B -> C+A+B" rarely costs more than "C+A vs. C+B -> C+A+B".
# However, this would be against the optimistic principle of ZODB.
if previous_serial is not None and previous_serial != serial:
assert serial < previous_serial, (serial, previous_serial)
logging.info('Conflict on %s:%s with %s',
dump(oid), dump(ttid), dump(previous_serial))
raise ConflictError(previous_serial)
logging.debug('Transaction %s locking %s', dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid
def checkCurrentSerial(self, ttid, oid, serial):
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise NotRegisteredError
self.lockObject(ttid, serial, oid)
transaction.serial_dict[oid] = serial
def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial):
"""
Store an object received from client node
"""
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise NotRegisteredError
self.lockObject(ttid, serial, oid)
transaction.serial_dict[oid] = serial
# store object
if data is None:
data_id = None
else:
data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial)
def rebaseObject(self, ttid, oid):
try:
transaction = self._transaction_dict[ttid]
except KeyError:
logging.info('Forget rebase of %s by %s delayed by %s',
dump(oid), dump(ttid), dump(self.getLockingTID(oid)))
return
try:
serial = transaction.serial_dict[oid]
except KeyError:
# There was a previous rebase for this oid, it was still delayed
# during the second RebaseTransaction, and then a conflict was
# reported when another transaction was committed.
# This can also happen when a partition is dropped.
logging.info("no oid %s to rebase for transaction %s",
dump(oid), dump(ttid))
return
assert oid not in transaction.lockless, (oid, transaction.lockless)
try:
self.lockObject(ttid, serial, oid)
except ConflictError, e:
# Move the data back to the client for conflict resolution,
# since the client may not have it anymore.
try:
data_id = transaction.store_dict.pop(oid)[1]
except KeyError: # check current
data = None
else:
if data_id is None:
data = None
else:
dm = self._app.dm
data = dm.loadData(data_id)
dm.releaseData([data_id], True)
del transaction.serial_dict[oid]
return serial, e.tid, data
def abort(self, ttid, even_if_locked=False):
"""
Abort a transaction
Releases locks held on all transaction objects, deletes Transaction
instance, and executed queued events.
Note: does not alter persistent content.
"""
if ttid not in self._transaction_dict:
assert not even_if_locked
# See how the master processes AbortTransaction from the client.
return
transaction = self._transaction_dict[ttid]
locked = transaction.tid
# if the transaction is locked, ensure we can drop it
if locked:
if not even_if_locked:
return
else:
logging.debug('Abort TXN %s', dump(ttid))
dm = self._app.dm
dm.abortTransaction(ttid)
dm.releaseData([x[1] for x in transaction.store_dict.itervalues()],
True)
dm.commit()
# unlock any object
for oid in transaction.serial_dict:
if locked:
lock_ttid = self._load_lock_dict.pop(oid, None)
assert lock_ttid in (ttid, None), ('Transaction %s tried'
' to release the lock on oid %s, but it was held by %s'
% (dump(ttid), dump(oid), dump(lock_ttid)))
try:
write_locking_tid = self._store_lock_dict[oid]
except KeyError:
# Lockless store (we are replicating this partition),
# or unresolved deadlock.
continue
if ttid != write_locking_tid:
if __debug__:
other = self._transaction_dict[write_locking_tid]
x = (oid, ttid, write_locking_tid,
self._replicated, transaction.lockless)
lockless = oid in transaction.lockless
assert oid in other.serial_dict and lockless == bool(
self._replicated.get(self.getPartition(oid))), x
if not lockless:
assert not locked, x
continue # unresolved deadlock
# Several lockless stores for this oid and among them,
# a higher ttid is still pending.
assert transaction < other, x
del self._store_lock_dict[oid]
# remove the transaction
del self._transaction_dict[ttid]
if self._replicated:
self._notifyReplicated()
# some locks were released, some pending locks may now succeed
self.read_queue.executeQueuedEvents()
self.executeQueuedEvents()
def abortFor(self, uuid):
"""
Abort any non-locked transaction of a node
"""
logging.debug('Abort for %s', uuid_str(uuid))
# abort any non-locked transaction of this node
for ttid, transaction in self._transaction_dict.items():
if transaction.uuid == uuid:
self.abort(ttid)
def isLockedTid(self, tid):
return any(None is not t.tid <= tid
for t in self._transaction_dict.itervalues())
def loadLocked(self, oid):
return oid in self._load_lock_dict
def log(self):
logging.info("Transactions:")
for ttid, txn in self._transaction_dict.iteritems():
logging.info(' %s %r', dump(ttid), txn)
logging.info(' Read locks:')
for oid, ttid in self._load_lock_dict.iteritems():
logging.info(' %s by %s', dump(oid), dump(ttid))
logging.info(' Write locks:')
for oid, ttid in self._store_lock_dict.iteritems():
logging.info(' %s by %s', dump(oid), dump(ttid))
self.logQueuedEvents()
self.read_queue.logQueuedEvents()
def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid)
if lock_tid is not None:
transaction = self._transaction_dict[lock_tid]
if transaction.store_dict[oid][2] == orig_serial:
if new_serial:
data_id = None
else:
self._app.dm.holdData(data_id)
transaction.store(oid, data_id, new_serial)