pax_global_header 0000666 0000000 0000000 00000000064 12014530641 0014506 g ustar 00root root 0000000 0000000 52 comment=4fcd8ddc8caa405fd23a527f4a128644363b69c6
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/ 0000775 0000000 0000000 00000000000 12014530641 0022456 5 ustar 00root root 0000000 0000000 neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/ 0000775 0000000 0000000 00000000000 12014530641 0023237 5 ustar 00root root 0000000 0000000 neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/ 0000775 0000000 0000000 00000000000 12014530641 0024703 5 ustar 00root root 0000000 0000000 neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/__init__.py 0000664 0000000 0000000 00000000000 12014530641 0027002 0 ustar 00root root 0000000 0000000 neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/app.py 0000664 0000000 0000000 00000034471 12014530641 0026046 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
from collections import deque
from neo.lib import logging
from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection
from neo.lib.exception import OperationFailure, PrimaryFailure
from neo.lib.connector import getConnectorHandler
from neo.lib.pt import PartitionTable
from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
from .checker import Checker
from .database import buildDatabaseManager
from .exception import AlreadyPendingError
from .handlers import identification, verification, initialization
from .handlers import master, hidden
from .replicator import Replicator
from .transactions import TransactionManager
from neo.lib.debug import register as registerLiveDebugger
class Application(object):
"""The storage node application."""
def __init__(self, config):
# set the cluster name
self.name = config.getCluster()
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.tm = TransactionManager(self)
self.dm = buildDatabaseManager(config.getAdapter(),
(config.getDatabase(), config.getWait())
)
# load master nodes
master_addresses, connector_name = config.getMasters()
self.connector_handler = getConnectorHandler(connector_name)
for master_address in master_addresses :
self.nm.createMaster(address=master_address)
# set the bind address
self.server = config.getBind()
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.checker = Checker(self)
self.replicator = Replicator(self)
self.listening_conn = None
self.master_conn = None
self.master_node = None
# operation related data
self.event_queue = None
self.event_queue_dict = None
self.operational = False
# ready is True when operational and got all informations
self.ready = False
self.dm.setup(reset=config.getReset())
self.loadConfiguration()
# force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None:
self.uuid = config.getUUID()
registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.nm.close()
self.em.close()
try:
self.dm.close()
except AttributeError:
pass
del self.__dict__
def _poll(self):
self.em.poll(1)
def log(self):
self.em.log()
self.logQueuedEvents()
self.nm.log()
self.tm.log()
if self.pt is not None:
self.pt.log()
def loadConfiguration(self):
"""Load persistent configuration data from the database.
If data is not present, generate it."""
dm = self.dm
# check cluster name
name = dm.getName()
if name is None:
dm.setName(self.name)
elif name != self.name:
raise RuntimeError('name %r does not match with the database: %r'
% (self.name, name))
# load configuration
self.uuid = dm.getUUID()
num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas()
ptid = dm.getPTID()
# check partition table configuration
if num_partitions is not None and num_replicas is not None:
if num_partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
# create a partition table
self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:')
logging.info('UUID : %s', uuid_str(self.uuid))
logging.info('PTID : %s', dump(ptid))
logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions)
logging.info('Replicas : %s', num_replicas)
def loadPartitionTable(self):
"""Load a partition table from the database."""
ptid = self.dm.getPTID()
cell_list = self.dm.getPartitionTable()
new_cell_list = []
for offset, uuid, state in cell_list:
# convert from int to Enum
state = CellStates[state]
# register unknown nodes
if self.nm.getByUUID(uuid) is None:
self.nm.createStorage(uuid=uuid)
new_cell_list.append((offset, uuid, state))
# load the partition table in manager
self.pt.clear()
self.pt.update(ptid, new_cell_list, self.nm)
def run(self):
try:
self._run()
except:
logging.exception('Pre-mortem data:')
self.log()
raise
def _run(self):
"""Make sure that the status is sane and start a loop."""
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
# Make a listening port
handler = identification.IdentificationHandler(self)
self.listening_conn = ListeningConnection(self.em, handler,
addr=self.server, connector=self.connector_handler())
self.server = self.listening_conn.getAddress()
# Connect to a primary master node, verify data, and
# start the operation. This cycle will be executed permanently,
# until the user explicitly requests a shutdown.
while True:
self.cluster_state = None
self.ready = False
self.operational = False
if self.master_node is None:
# look for the primary master
self.connectToPrimary()
# check my state
node = self.nm.getByUUID(self.uuid)
if node is not None and node.isHidden():
self.wait()
# drop any client node
for conn in self.em.getConnectionList():
if conn not in (self.listening_conn, self.master_conn):
conn.close()
# create/clear event queue
self.event_queue = deque()
self.event_queue_dict = dict()
try:
self.verifyData()
self.initialize()
self.doOperation()
raise RuntimeError, 'should not reach here'
except OperationFailure, msg:
logging.error('operation stopped: %s', msg)
if self.cluster_state == ClusterStates.STOPPING_BACKUP:
self.dm.setBackupTID(None)
except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg)
finally:
self.checker = Checker(self)
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage."""
pt = self.pt
# First of all, make sure that I have no connection.
for conn in self.em.getConnectionList():
if not conn.isListening():
conn.close()
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, self.name,
NodeTypes.STORAGE, self.uuid, self.server)
data = bootstrap.getPrimaryConnection(self.connector_handler)
(node, conn, uuid, num_partitions, num_replicas) = data
self.master_node = node
self.master_conn = conn
logging.info('I am %s', uuid_str(uuid))
self.uuid = uuid
self.dm.setUUID(uuid)
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
if pt is not None:
self.loadPartitionTable()
if num_partitions != pt.getPartitions():
raise RuntimeError('the number of partitions is inconsistent')
if pt is None or pt.getReplicas() != num_replicas:
# changing number of replicas is not an issue
self.dm.setNumPartitions(num_partitions)
self.dm.setNumReplicas(num_replicas)
self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable()
def verifyData(self):
"""Verify data under the control by a primary master node.
Connections from client nodes may not be accepted at this stage."""
logging.info('verifying data')
handler = verification.VerificationHandler(self)
self.master_conn.setHandler(handler)
_poll = self._poll
while not self.operational:
_poll()
def initialize(self):
""" Retreive partition table and node informations from the primary """
logging.debug('initializing...')
_poll = self._poll
handler = initialization.InitializationHandler(self)
self.master_conn.setHandler(handler)
# ask node list and partition table
self.pt.clear()
self.master_conn.ask(Packets.AskLastIDs())
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable())
while self.master_conn.isPending():
_poll()
self.ready = True
self.replicator.populate()
self.master_conn.notify(Packets.NotifyReady())
def doOperation(self):
"""Handle everything, including replications and transactions."""
logging.info('doing operation')
_poll = self._poll
isIdle = self.em.isIdle
handler = master.MasterOperationHandler(self)
self.master_conn.setHandler(handler)
# Forget all unfinished data.
self.dm.dropUnfinishedData()
self.tm.reset()
self.task_queue = task_queue = deque()
try:
while True:
while task_queue and isIdle():
try:
task_queue[-1].next()
task_queue.rotate()
except StopIteration:
task_queue.pop()
_poll()
finally:
del self.task_queue
# XXX: Although no handled exception should happen between
# replicator.populate() and the beginning of this 'try'
# clause, the replicator should be reset in a safer place.
self.replicator = Replicator(self)
# Abort any replication, whether we are feeding or out-of-date.
for node in self.nm.getStorageList(only_identified=True):
node.getConnection().close()
def changeClusterState(self, state):
self.cluster_state = state
if state == ClusterStates.STOPPING_BACKUP:
self.replicator.stop()
def wait(self):
# change handler
logging.info("waiting in hidden state")
_poll = self._poll
handler = hidden.HiddenHandler(self)
for conn in self.em.getConnectionList():
conn.setHandler(handler)
node = self.nm.getByUUID(self.uuid)
while True:
_poll()
if not node.isHidden():
break
def queueEvent(self, some_callable, conn, args, key=None,
raise_on_duplicate=True):
msg_id = conn.getPeerId()
event_queue_dict = self.event_queue_dict
if raise_on_duplicate and key in event_queue_dict:
raise AlreadyPendingError()
else:
self.event_queue.append((key, some_callable, msg_id, conn, args))
if key is not None:
try:
event_queue_dict[key] += 1
except KeyError:
event_queue_dict[key] = 1
def executeQueuedEvents(self):
l = len(self.event_queue)
p = self.event_queue.popleft
event_queue_dict = self.event_queue_dict
for _ in xrange(l):
key, some_callable, msg_id, conn, args = p()
if key is not None:
event_queue_dict[key] -= 1
if event_queue_dict[key] == 0:
del event_queue_dict[key]
if conn.isClosed():
continue
orig_msg_id = conn.getPeerId()
try:
conn.setPeerId(msg_id)
some_callable(conn, *args)
finally:
conn.setPeerId(orig_msg_id)
def logQueuedEvents(self):
if self.event_queue is None:
return
logging.info("Pending events:")
for key, event, _msg_id, _conn, args in self.event_queue:
logging.info(' %r:%r: %r:%r %r %r', key, event.__name__,
_msg_id, _conn, args)
def newTask(self, iterator):
try:
iterator.next()
except StopIteration:
return
self.task_queue.appendleft(iterator)
def closeClient(self, connection):
if connection is not self.replicator.getCurrentConnection() and \
connection not in self.checker.conn_dict:
connection.closeClient()
def shutdown(self, erase=False):
"""Close all connections and exit"""
for c in self.em.getConnectionList():
try:
c.close()
except PrimaryFailure:
pass
# clear database to avoid polluting the cluster at restart
self.dm.setup(reset=erase)
logging.info("Application has been asked to shut down")
sys.exit()
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/checker.py 0000664 0000000 0000000 00000020614 12014530641 0026664 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from collections import deque
from neo.lib import logging
from neo.lib.connection import ClientConnection
from neo.lib.connector import ConnectorConnectionClosedException
from neo.lib.protocol import NodeTypes, Packets, ZERO_OID
from neo.lib.util import add64, dump
from .handlers.storage import StorageOperationHandler
CHECK_COUNT = 4000
class Checker(object):
def __init__(self, app):
self.app = app
self.queue = deque()
self.conn_dict = {}
def __call__(self, partition, source, min_tid, max_tid):
self.queue.append((partition, source, min_tid, max_tid))
if not self.conn_dict:
self._nextPartition()
def _nextPartition(self):
app = self.app
def connect(node, uuid=app.uuid, name=app.name):
if node.getUUID() == app.uuid:
return
if node.isConnected(connecting=True):
conn = node.getConnection()
conn.asClient()
else:
conn = ClientConnection(app.em, StorageOperationHandler(app),
node=node, connector=app.connector_handler())
conn.ask(Packets.RequestIdentification(
NodeTypes.STORAGE, uuid, app.server, name))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
try:
self.conn_dict.clear()
while True:
try:
partition, (name, source), min_tid, max_tid = \
self.queue.popleft()
except IndexError:
return
cell = app.pt.getCell(partition, app.uuid)
if cell is None or cell.isOutOfDate():
msg = "discarded or out-of-date"
else:
try:
for cell in app.pt.getCellList(partition):
# XXX: Ignore corrupted cells for the moment
# because we're still unable to fix them
# (see also AdministrationHandler of master)
if cell.isReadable(): #if not cell.isOutOfDate():
connect(cell.getNode())
if source:
node = app.nm.getByAddress(source)
if name:
source = app.nm.createStorage(address=source) \
if node is None else node
connect(source, None, name)
elif (node.getUUID() == app.uuid or
node.isConnected(connecting=True) and
node.getConnection() in self.conn_dict):
source = node
else:
msg = "unavailable source"
if self.conn_dict:
break
msg = "no replica"
except ConnectorConnectionClosedException:
msg = "connection closed"
finally:
conn_set.update(self.conn_dict)
self.conn_dict.clear()
logging.error("Failed to start checking partition %u (%s)",
partition, msg)
conn_set.difference_update(self.conn_dict)
finally:
for conn in conn_set:
app.closeClient(conn)
logging.debug("start checking partition %u from %s to %s",
partition, dump(min_tid), dump(max_tid))
self.min_tid = self.next_tid = min_tid
self.max_tid = max_tid
self.next_oid = None
self.partition = partition
self.source = source
args = partition, CHECK_COUNT, min_tid, max_tid
p = Packets.AskCheckTIDRange(*args)
for conn, identified in self.conn_dict.items():
self.conn_dict[conn] = conn.ask(p) if identified else None
self.conn_dict[None] = app.dm.checkTIDRange(*args)
def connected(self, node):
conn = node.getConnection()
if self.conn_dict.get(conn, self) is None:
self.conn_dict[conn] = conn.ask(Packets.AskCheckTIDRange(
self.partition, CHECK_COUNT, self.next_tid, self.max_tid))
def connectionLost(self, conn):
try:
del self.conn_dict[conn]
except KeyError:
return
if self.source is not None and self.source.getConnection() is conn:
del self.source
elif len(self.conn_dict) > 1:
logging.warning("node lost but keep up checking partition %u",
self.partition)
return
logging.warning("check of partition %u aborted", self.partition)
self._nextPartition()
def _nextRange(self):
if self.next_oid:
args = self.partition, CHECK_COUNT, self.next_tid, self.max_tid, \
self.next_oid
p = Packets.AskCheckSerialRange(*args)
check = self.app.dm.checkSerialRange
else:
args = self.partition, CHECK_COUNT, self.next_tid, self.max_tid
p = Packets.AskCheckTIDRange(*args)
check = self.app.dm.checkTIDRange
for conn in self.conn_dict.keys():
self.conn_dict[conn] = check(*args) if conn is None else conn.ask(p)
def checkRange(self, conn, *args):
if self.conn_dict.get(conn, self) != conn.getPeerId():
# Ignore answers to old requests,
# because we did nothing to cancel them.
logging.info("ignored AnswerCheck*Range%r", args)
return
self.conn_dict[conn] = args
answer_set = set(self.conn_dict.itervalues())
if len(answer_set) > 1:
for answer in answer_set:
if type(answer) is not tuple:
return
# TODO: Automatically tell corrupted cells to fix their data
# if we know a good source.
# For the moment, tell master to put them in CORRUPTED state
# and keep up checking if useful.
uuid = self.app.uuid
args = None if self.source is None else self.conn_dict[
None if self.source.getUUID() == uuid
else self.source.getConnection()]
uuid_list = []
for conn, answer in self.conn_dict.items():
if answer != args:
del self.conn_dict[conn]
if conn is None:
uuid_list.append(uuid)
else:
uuid_list.append(conn.getUUID())
self.app.closeClient(conn)
p = Packets.NotifyPartitionCorrupted(self.partition, uuid_list)
self.app.master_conn.notify(p)
if len(self.conn_dict) <= 1:
logging.warning("check of partition %u aborted", self.partition)
self.queue.clear()
self._nextPartition()
return
try:
count, _, max_tid = args
except ValueError:
count, _, self.next_tid, _, max_oid = args
if count < CHECK_COUNT:
logging.debug("partition %u checked from %s to %s",
self.partition, dump(self.min_tid), dump(self.max_tid))
self._nextPartition()
return
self.next_oid = add64(max_oid, 1)
else:
(count, _, max_tid), = answer_set
if count < CHECK_COUNT:
self.next_tid = self.min_tid
self.next_oid = ZERO_OID
else:
self.next_tid = add64(max_tid, 1)
self._nextRange()
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/database/ 0000775 0000000 0000000 00000000000 12014530641 0026447 5 ustar 00root root 0000000 0000000 neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/database/__init__.py 0000664 0000000 0000000 00000002502 12014530641 0030557 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
LOG_QUERIES = False
from neo.lib.exception import DatabaseFailure
from .manager import DatabaseManager
from .sqlite import SQLiteDatabaseManager
DATABASE_MANAGER_DICT = {'SQLite': SQLiteDatabaseManager}
try:
from .mysqldb import MySQLDatabaseManager
except ImportError:
pass
else:
DATABASE_MANAGER_DICT['MySQL'] = MySQLDatabaseManager
def buildDatabaseManager(name, args=(), kw={}):
if name is None:
name = DATABASE_MANAGER_DICT.keys()[0]
adapter_klass = DATABASE_MANAGER_DICT.get(name, None)
if adapter_klass is None:
raise DatabaseFailure('Cannot find a database adapter <%s>' % name)
return adapter_klass(*args, **kw)
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/database/manager.py 0000664 0000000 0000000 00000047164 12014530641 0030447 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging, util
from neo.lib.protocol import ZERO_TID
class CreationUndone(Exception):
pass
class DatabaseManager(object):
"""This class only describes an interface for database managers."""
def __init__(self, database, wait):
"""
Initialize the object.
"""
self._wait = wait
self._parse(database)
def _parse(self, database):
"""Called during instanciation, to process database parameter."""
pass
def setup(self, reset = 0):
"""Set up a database
It must recover self._uncommitted_data from temporary object table.
_uncommitted_data is a dict containing refcounts to data of
write-locked objects, except in case of undo, where the refcount is
increased later, when the object is read-locked.
Keys are data ids and values are number of references.
If reset is true, existing data must be discarded and
self._uncommitted_data must be an empty dict.
"""
raise NotImplementedError
def commit(self):
pass
def _getPartition(self, oid_or_tid):
return oid_or_tid % self.getNumPartitions()
def getConfiguration(self, key):
"""
Return a configuration value, returns None if not found or not set
"""
raise NotImplementedError
def setConfiguration(self, key, value):
"""
Set a configuration value
"""
self._setConfiguration(key, value)
self.commit()
def _setConfiguration(self, key, value):
raise NotImplementedError
def getUUID(self):
"""
Load a NID from a database.
"""
nid = self.getConfiguration('nid')
if nid is not None:
return int(nid)
def setUUID(self, nid):
"""
Store a NID into a database.
"""
self.setConfiguration('nid', str(nid))
def getNumPartitions(self):
"""
Load the number of partitions from a database.
"""
n = self.getConfiguration('partitions')
if n is not None:
return int(n)
def setNumPartitions(self, num_partitions):
"""
Store the number of partitions into a database.
"""
self.setConfiguration('partitions', num_partitions)
def getNumReplicas(self):
"""
Load the number of replicas from a database.
"""
n = self.getConfiguration('replicas')
if n is not None:
return int(n)
def setNumReplicas(self, num_replicas):
"""
Store the number of replicas into a database.
"""
self.setConfiguration('replicas', num_replicas)
def getName(self):
"""
Load a name from a database.
"""
return self.getConfiguration('name')
def setName(self, name):
"""
Store a name into a database.
"""
self.setConfiguration('name', name)
def getPTID(self):
"""
Load a Partition Table ID from a database.
"""
ptid = self.getConfiguration('ptid')
if ptid is not None:
return long(ptid)
def setPTID(self, ptid):
"""
Store a Partition Table ID into a database.
"""
if ptid is not None:
assert isinstance(ptid, (int, long)), ptid
ptid = str(ptid)
self.setConfiguration('ptid', ptid)
def getBackupTID(self):
return util.bin(self.getConfiguration('backup_tid'))
def setBackupTID(self, backup_tid):
tid = util.dump(backup_tid)
logging.debug('backup_tid = %s', tid)
return self.setConfiguration('backup_tid', tid)
def getPartitionTable(self):
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
raise NotImplementedError
def _getLastIDs(self, all=True):
raise NotImplementedError
def getLastIDs(self, all=True):
trans, obj, oid = self._getLastIDs()
if trans:
tid = max(trans.itervalues())
if obj:
tid = max(tid, max(obj.itervalues()))
else:
tid = max(obj.itervalues()) if obj else None
return tid, trans, obj, oid
def getUnfinishedTIDList(self):
"""Return a list of unfinished transaction's IDs."""
raise NotImplementedError
def objectPresent(self, oid, tid, all = True):
"""Return true iff an object specified by a given pair of an
object ID and a transaction ID is present in a database.
Otherwise, return false. If all is true, the object must be
searched from unfinished transactions as well."""
raise NotImplementedError
def _getObject(self, oid, tid=None, before_tid=None):
"""
oid (int)
Identifier of object to retrieve.
tid (int, None)
Exact serial to retrieve.
before_tid (packed, None)
Serial to retrieve is the highest existing one strictly below this
value.
"""
raise NotImplementedError
def getObject(self, oid, tid=None, before_tid=None):
"""
oid (packed)
Identifier of object to retrieve.
tid (packed, None)
Exact serial to retrieve.
before_tid (packed, None)
Serial to retrieve is the highest existing one strictly below this
value.
Return value:
None: Given oid doesn't exist in database.
False: No record found, but another one exists for given oid.
6-tuple: Record content.
- record serial (packed)
- serial or next record modifying object (packed, None)
- compression (boolean-ish, None)
- checksum (integer, None)
- data (binary string, None)
- data_serial (packed, None)
"""
u64 = util.u64
p64 = util.p64
oid = u64(oid)
if tid is not None:
tid = u64(tid)
if before_tid is not None:
before_tid = u64(before_tid)
result = self._getObject(oid, tid, before_tid)
if result:
serial, next_serial, compression, checksum, data, data_serial = \
result
assert before_tid is None or next_serial is None or \
before_tid <= next_serial
if serial is not None:
serial = p64(serial)
if next_serial is not None:
next_serial = p64(next_serial)
if data_serial is not None:
data_serial = p64(data_serial)
return serial, next_serial, compression, checksum, data, data_serial
# See if object exists at all
return self._getObject(oid) and False
def changePartitionTable(self, ptid, cell_list):
"""Change a part of a partition table. The list of cells is
a tuple of tuples, each of which consists of an offset (row ID),
the NID of a storage node, and a cell state. The Partition
Table ID must be stored as well."""
raise NotImplementedError
def setPartitionTable(self, ptid, cell_list):
"""Set a whole partition table. The semantics is the same as
changePartitionTable, except that existing data must be
thrown away."""
raise NotImplementedError
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
raise NotImplementedError
def dropUnfinishedData(self):
"""Drop any unfinished data from a database."""
raise NotImplementedError
def storeTransaction(self, tid, object_list, transaction, temporary = True):
"""Store a transaction temporarily, if temporary is true. Note
that this transaction is not finished yet. The list of objects
contains tuples, each of which consists of an object ID,
a data_id and object serial.
The transaction is either None or a tuple of the list of OIDs,
user information, a description, extension information and transaction
pack state (True for packed)."""
raise NotImplementedError
def _pruneData(self, data_id_list):
"""To be overriden by the backend to delete any unreferenced data
'unreferenced' means:
- not in self._uncommitted_data
- and not referenced by a fully-committed object (storage should have
an index or a refcount of all data ids of all objects)
"""
raise NotImplementedError
def _storeData(self, checksum, data, compression):
"""To be overriden by the backend to store object raw data
If same data was already stored, the storage only has to check there's
no hash collision.
"""
raise NotImplementedError
def storeData(self, checksum_or_id, data=None, compression=None):
"""Store object raw data
checksum must be the result of neo.lib.util.makeChecksum(data)
'compression' indicates if 'data' is compressed.
A volatile reference is set to this data until 'unlockData' is called
with this checksum.
If called with only an id, it only increment the volatile
reference to the data matching the id.
"""
refcount = self._uncommitted_data
if data is not None:
checksum_or_id = self._storeData(checksum_or_id, data, compression)
refcount[checksum_or_id] = 1 + refcount.get(checksum_or_id, 0)
return checksum_or_id
def unlockData(self, data_id_list, prune=False):
"""Release 1 volatile reference to given list of checksums
If 'prune' is true, any data that is not referenced anymore (either by
a volatile reference or by a fully-committed object) is deleted.
"""
refcount = self._uncommitted_data
for data_id in data_id_list:
count = refcount[data_id] - 1
if count:
refcount[data_id] = count
else:
del refcount[data_id]
if prune:
self._pruneData(data_id_list)
self.commit()
__getDataTID = set()
def _getDataTID(self, oid, tid=None, before_tid=None):
"""
Return a 2-tuple:
tid (int)
tid corresponding to received parameters
serial
tid at which actual object data is located
If 'tid is None', requested object and transaction could
not be found.
If 'serial is None', requested object exist but has no data (its creation
has been undone).
If 'tid == serial', it means that requested transaction
contains object data.
Otherwise, it's an undo transaction which did not involve conflict
resolution.
"""
if self.__class__ not in self.__getDataTID:
self.__getDataTID.add(self.__class__)
logging.warning("Fallback to generic/slow implementation"
" of _getDataTID. It should be overriden by backend storage.")
r = self._getObject(oid, tid, before_tid)
if r:
serial, _, _, data_id, _, value_serial = r
if value_serial is None and data_id:
return serial, serial
return serial, value_serial
return None, None
def findUndoTID(self, oid, tid, ltid, undone_tid, transaction_object):
"""
oid
Object OID
tid
Transation doing the undo
ltid
Upper (exclued) bound of transactions visible to transaction doing
the undo.
undone_tid
Transaction to undo
transaction_object
Object data from memory, if it was modified by running
transaction.
None if is was not modified by running transaction.
Returns a 3-tuple:
current_tid (p64)
TID of most recent version of the object client's transaction can
see. This is used later to detect current conflicts (eg, another
client modifying the same object in parallel)
data_tid (int)
TID containing (without indirection) the data prior to undone
transaction.
None if object doesn't exist prior to transaction being undone
(its creation is being undone).
is_current (bool)
False if object was modified by later transaction (ie, data_tid is
not current), True otherwise.
"""
u64 = util.u64
p64 = util.p64
oid = u64(oid)
tid = u64(tid)
if ltid:
ltid = u64(ltid)
undone_tid = u64(undone_tid)
def getDataTID(tid=None, before_tid=None):
tid, value_serial = self._getDataTID(oid, tid, before_tid)
if value_serial not in (None, tid):
if value_serial >= tid:
raise ValueError("Incorrect value reference found for"
" oid %d at tid %d: reference = %d"
% (oid, value_serial, tid))
if value_serial != getDataTID(value_serial)[1]:
logging.warning("Multiple levels of indirection"
" when getting data serial for oid %d at tid %d."
" This causes suboptimal performance.", oid, tid)
return tid, value_serial
if transaction_object:
current_tid = current_data_tid = u64(transaction_object[2])
else:
current_tid, current_data_tid = getDataTID(before_tid=ltid)
if current_tid is None:
return (None, None, False)
found_undone_tid, undone_data_tid = getDataTID(tid=undone_tid)
assert found_undone_tid is not None, (oid, undone_tid)
is_current = undone_data_tid in (current_data_tid, tid)
# Load object data as it was before given transaction.
# It can be None, in which case it means we are undoing object
# creation.
_, data_tid = getDataTID(before_tid=undone_tid)
if data_tid is not None:
data_tid = p64(data_tid)
return p64(current_tid), data_tid, is_current
def finishTransaction(self, tid):
"""Finish a transaction specified by a given ID, by moving
temporarily data to a finished area."""
raise NotImplementedError
def deleteTransaction(self, tid, oid_list=()):
"""Delete a transaction and its content specified by a given ID and
an oid list"""
raise NotImplementedError
def deleteObject(self, oid, serial=None):
"""Delete given object. If serial is given, only delete that serial for
given oid."""
raise NotImplementedError
def _deleteRange(self, partition, min_tid=None, max_tid=None):
"""Delete all objects and transactions between given min_tid (excluded)
and max_tid (included)"""
raise NotImplementedError
def truncate(self, tid):
assert tid not in (None, ZERO_TID), tid
assert self.getBackupTID()
self.setBackupTID(None) # XXX
for partition in xrange(self.getNumPartitions()):
self._deleteRange(partition, tid)
self.commit()
def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information,
a description, and extension information, for a given transaction
ID. If there is no such transaction ID in a database, return None.
If all is true, the transaction must be searched from a temporary
area as well."""
raise NotImplementedError
def getObjectHistory(self, oid, offset = 0, length = 1):
"""Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. Result starts
with latest serial, and the list must be sorted in descending order.
If there is no such object ID in a database, return None."""
raise NotImplementedError
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
"""Return a dict of length oids grouped by serial at (or above)
min_tid and min_oid and below max_tid, for given partition,
sorted in ascending order."""
raise NotImplementedError
def getTIDList(self, offset, length, partition_list):
"""Return a list of TIDs in ascending order from an offset,
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
raise NotImplementedError
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
"""Return a list of TIDs in ascending order from an initial tid value,
at most the specified length up to max_tid. The partition number is
passed to filter out non-applicable TIDs."""
raise NotImplementedError
def pack(self, tid, updateObjectDataForPack):
"""Prune all non-current object revisions at given tid.
updateObjectDataForPack is a function called for each deleted object
and revision with:
- OID
- packed TID
- new value_serial
If object data was moved to an after-pack-tid revision, this
parameter contains the TID of that revision, allowing to backlink
to it.
- getObjectData function
To call if value_serial is None and an object needs to be updated.
Takes no parameter, returns a 3-tuple: compression, data_id,
value
"""
raise NotImplementedError
def checkTIDRange(self, partition, length, min_tid, max_tid):
"""
Generate a diggest from transaction list.
min_tid (packed)
TID at which verification starts.
length (int)
Maximum number of records to include in result.
Returns a 3-tuple:
- number of records actually found
- a SHA1 computed from record's TID
ZERO_HASH if no record found
- biggest TID found (ie, TID of last record read)
ZERO_TID if not record found
"""
raise NotImplementedError
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
"""
Generate a diggest from object list.
min_oid (packed)
OID at which verification starts.
min_tid (packed)
Serial of min_oid object at which search should start.
length
Maximum number of records to include in result.
Returns a 5-tuple:
- number of records actually found
- a SHA1 computed from record's OID
ZERO_HASH if no record found
- biggest OID found (ie, OID of last record read)
ZERO_OID if no record found
- a SHA1 computed from record's serial
ZERO_HASH if no record found
- biggest serial found for biggest OID found (ie, serial of last
record read)
ZERO_TID if no record found
"""
raise NotImplementedError
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/database/mysqldb.py 0000664 0000000 0000000 00000071304 12014530641 0030501 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from binascii import a2b_hex
import MySQLdb
from MySQLdb import IntegrityError, OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
from MySQLdb.constants.ER import DUP_ENTRY
from array import array
from hashlib import sha1
import re
import string
import time
from . import DatabaseManager, LOG_QUERIES
from .manager import CreationUndone
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
def splitOIDField(tid, oids):
if (len(oids) % 8) != 0 or len(oids) == 0:
raise DatabaseFailure('invalid oids length for tid %d: %d' % (tid,
len(oids)))
oid_list = []
append = oid_list.append
for i in xrange(0, len(oids), 8):
append(oids[i:i+8])
return oid_list
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
# WARNING: some parts are not concurrent safe (ex: storeData)
# (there must be only 1 writable connection per DB)
# Disabled even on MySQL 5.1-5.5 and MariaDB 5.2-5.3 because
# 'select count(*) from obj' sometimes returns incorrect values
# (tested with testOudatedCellsOnDownStorage).
_use_partition = False
def __init__(self, database, wait):
super(MySQLDatabaseManager, self).__init__(database, wait)
self.conn = None
self._config = {}
self._connect()
def _parse(self, database):
""" Get the database credentials (username, password, database) """
# expected pattern : [user[:password]@]database[(.|/)unix_socket]
self.user, self.passwd, self.db, self.socket = re.match(
'(?:([^:]+)(?::(.*))?@)?([^./]+)(.+)?$', database).groups()
def close(self):
self.conn.close()
def _connect(self):
kwd = {'db' : self.db, 'user' : self.user}
if self.passwd is not None:
kwd['passwd'] = self.passwd
if self.socket:
kwd['unix_socket'] = self.socket
logging.info('connecting to MySQL on the database %s with user %s',
self.db, self.user)
if self._wait < 0:
timeout_at = None
else:
timeout_at = time.time() + self._wait
while True:
try:
self.conn = MySQLdb.connect(**kwd)
except Exception:
if timeout_at is not None and time.time() >= timeout_at:
raise
logging.exception('Connection to MySQL failed, retrying.')
time.sleep(1)
else:
break
self.conn.autocommit(False)
self.conn.query("SET SESSION group_concat_max_len = %u" % (2**32-1))
self.conn.set_sql_mode("TRADITIONAL,NO_ENGINE_SUBSTITUTION")
def commit(self):
logging.debug('committing...')
self.conn.commit()
def query(self, query):
"""Query data from a database."""
conn = self.conn
try:
if LOG_QUERIES:
printable_char_list = []
for c in query.split('\n', 1)[0][:70]:
if c not in string.printable or c in '\t\x0b\x0c\r':
c = '\\x%02x' % ord(c)
printable_char_list.append(c)
query_part = ''.join(printable_char_list)
logging.debug('querying %s...', query_part)
conn.query(query)
r = conn.store_result()
if r is not None:
new_r = []
for row in r.fetch_row(r.num_rows()):
new_row = []
for d in row:
if isinstance(d, array):
d = d.tostring()
new_row.append(d)
new_r.append(tuple(new_row))
r = tuple(new_r)
except OperationalError, m:
if m[0] in (SERVER_GONE_ERROR, SERVER_LOST):
logging.info('the MySQL server is gone; reconnecting')
self._connect()
return self.query(query)
raise DatabaseFailure('MySQL error %d: %s' % (m[0], m[1]))
return r
def escape(self, s):
"""Escape special characters in a string."""
return self.conn.escape_string(s)
def setup(self, reset = 0):
self._config.clear()
q = self.query
if reset:
q('DROP TABLE IF EXISTS config, pt, trans, obj, data, ttrans, tobj')
# The table "config" stores configuration parameters which affect the
# persistent data.
q("""CREATE TABLE IF NOT EXISTS config (
name VARBINARY(255) NOT NULL PRIMARY KEY,
value VARBINARY(255) NULL
) ENGINE = InnoDB""")
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
rid INT UNSIGNED NOT NULL,
nid INT NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, nid)
) ENGINE = InnoDB""")
p = self._use_partition and """ PARTITION BY LIST (partition) (
PARTITION dummy VALUES IN (NULL))""" or ''
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
partition SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (partition, tid)
) ENGINE = InnoDB""" + p)
# The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj (
partition SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (partition, tid, oid),
KEY (partition, oid, tid),
KEY (data_id)
) ENGINE = InnoDB""" + p)
# The table "data" stores object data.
# We'd like to have partial index on 'hash' colum (e.g. hash(4))
# but 'UNIQUE' constraint would not work as expected.
q("""CREATE TABLE IF NOT EXISTS data (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
hash BINARY(20) NOT NULL UNIQUE,
compression TINYINT UNSIGNED NULL,
value LONGBLOB NULL
) ENGINE = InnoDB""")
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
partition SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL
) ENGINE = InnoDB""")
# The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj (
partition SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (tid, oid)
) ENGINE = InnoDB""")
self._uncommitted_data = dict(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id") or ())
def getConfiguration(self, key):
try:
return self._config[key]
except KeyError:
sql_key = self.escape(str(key))
try:
r = self.query("SELECT value FROM config WHERE name = '%s'"
% sql_key)[0][0]
except IndexError:
r = None
self._config[key] = r
return r
def _setConfiguration(self, key, value):
q = self.query
e = self.escape
self._config[key] = value
key = e(str(key))
if value is None:
q("DELETE FROM config WHERE name = '%s'" % key)
else:
value = e(str(value))
q("REPLACE INTO config VALUES ('%s', '%s')" % (key, value))
def _setPackTID(self, tid):
self._setConfiguration('_pack_tid', tid)
def _getPackTID(self):
try:
return int(self.getConfiguration('_pack_tid'))
except TypeError:
return -1
def getPartitionTable(self):
return self.query("SELECT * FROM pt")
def _getLastIDs(self, all=True):
p64 = util.p64
q = self.query
trans = dict((partition, p64(tid))
for partition, tid in q("SELECT partition, MAX(tid)"
" FROM trans GROUP BY partition"))
obj = dict((partition, p64(tid))
for partition, tid in q("SELECT partition, MAX(tid)"
" FROM obj GROUP BY partition"))
oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj"
" GROUP BY partition) as t")[0][0]
if all:
tid = q("SELECT MAX(tid) FROM ttrans")[0][0]
if tid is not None:
trans[None] = p64(tid)
tid, toid = q("SELECT MAX(tid), MAX(oid) FROM tobj")[0]
if tid is not None:
obj[None] = p64(tid)
if toid is not None and (oid < toid or oid is None):
oid = toid
return trans, obj, None if oid is None else p64(oid)
def getUnfinishedTIDList(self):
p64 = util.p64
return [p64(t[0]) for t in self.query("SELECT tid FROM ttrans"
" UNION SELECT tid FROM tobj")]
def objectPresent(self, oid, tid, all = True):
oid = util.u64(oid)
tid = util.u64(tid)
q = self.query
return q("SELECT 1 FROM obj WHERE partition=%d AND oid=%d AND tid=%d"
% (self._getPartition(oid), oid, tid)) or all and \
q("SELECT 1 FROM tobj WHERE tid=%d AND oid=%d" % (tid, oid))
def _getObject(self, oid, tid=None, before_tid=None):
q = self.query
partition = self._getPartition(oid)
sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE partition = %d AND oid = %d') % (partition, oid)
if tid is not None:
sql += ' AND tid = %d' % tid
elif before_tid is not None:
sql += ' AND tid < %d ORDER BY tid DESC LIMIT 1' % before_tid
else:
# XXX I want to express "HAVING tid = MAX(tid)", but
# MySQL does not use an index for a HAVING clause!
sql += ' ORDER BY tid DESC LIMIT 1'
r = q(sql)
try:
serial, compression, checksum, data, value_serial = r[0]
except IndexError:
return None
r = q("SELECT tid FROM obj WHERE partition=%d AND oid=%d AND tid>%d"
" ORDER BY tid LIMIT 1" % (partition, oid, serial))
try:
next_serial = r[0][0]
except IndexError:
next_serial = None
return serial, next_serial, compression, checksum, data, value_serial
def doSetPartitionTable(self, ptid, cell_list, reset):
offset_list = []
q = self.query
if reset:
q("TRUNCATE pt")
for offset, nid, state in cell_list:
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
if state == CellStates.DISCARDED:
q("DELETE FROM pt WHERE rid = %d AND nid = %d"
% (offset, nid))
else:
offset_list.append(offset)
q("INSERT INTO pt VALUES (%d, %d, %d)"
" ON DUPLICATE KEY UPDATE state = %d"
% (offset, nid, state, state))
self.setPTID(ptid)
if self._use_partition:
for offset in offset_list:
add = """ALTER TABLE %%s ADD PARTITION (
PARTITION p%u VALUES IN (%u))""" % (offset, offset)
for table in 'trans', 'obj':
try:
self.conn.query(add % table)
except OperationalError, (code, _):
if code != 1517: # duplicate partition name
raise
def changePartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, False)
def setPartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, True)
def dropPartitions(self, offset_list):
q = self.query
# XXX: these queries are inefficient (execution time increase with
# row count, although we use indexes) when there are rows to
# delete. It should be done as an idle task, by chunks.
for partition in offset_list:
where = " WHERE partition=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj" + where) if x]
if not self._use_partition:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
self._pruneData(data_id_list)
if self._use_partition:
drop = "ALTER TABLE %s DROP PARTITION" + \
','.join(' p%u' % i for i in offset_list)
for table in 'trans', 'obj':
try:
self.conn.query(drop % table)
except OperationalError, (code, _):
if code != 1508: # already dropped
raise
def dropUnfinishedData(self):
q = self.query
data_id_list = [x for x, in q("SELECT data_id FROM tobj") if x]
q("TRUNCATE tobj")
q("TRUNCATE ttrans")
self.unlockData(data_id_list, True)
def storeTransaction(self, tid, object_list, transaction, temporary = True):
e = self.escape
u64 = util.u64
tid = u64(tid)
if temporary:
obj_table = 'tobj'
trans_table = 'ttrans'
else:
obj_table = 'obj'
trans_table = 'trans'
q = self.query
for oid, data_id, value_serial in object_list:
oid = u64(oid)
partition = self._getPartition(oid)
if value_serial:
value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=%d AND oid=%d AND tid=%d"
% (partition, oid, value_serial))
if temporary:
self.storeData(data_id)
else:
value_serial = 'NULL'
q("REPLACE INTO %s VALUES (%d, %d, %d, %s, %s)" % (obj_table,
partition, oid, tid, data_id or 'NULL', value_serial))
if transaction:
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
assert packed in (0, 1)
q("REPLACE INTO %s VALUES (%d,%d,%i,'%s','%s','%s','%s',%d)" % (
trans_table, partition, tid, packed, e(''.join(oid_list)),
e(user), e(desc), e(ext), u64(ttid)))
if temporary:
self.commit()
def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list:
self.query("DELETE data FROM data"
" LEFT JOIN obj ON (id = data_id)"
" WHERE id IN (%s) AND data_id IS NULL"
% ",".join(map(str, data_id_list)))
def _storeData(self, checksum, data, compression):
e = self.escape
checksum = e(checksum)
try:
self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" %
(checksum, compression, e(data)))
except IntegrityError, (code, _):
if code == DUP_ENTRY:
(r, c, d), = self.query("SELECT id, compression, value"
" FROM data WHERE hash='%s'" % checksum)
if c == compression and d == data:
return r
raise
return self.conn.insert_id()
def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT tid, data_id, value_tid FROM obj'
' WHERE partition = %d AND oid = %d'
) % (self._getPartition(oid), oid)
if tid is not None:
sql += ' AND tid = %d' % tid
elif before_tid is not None:
sql += ' AND tid < %d ORDER BY tid DESC LIMIT 1' % before_tid
else:
# XXX I want to express "HAVING tid = MAX(tid)", but
# MySQL does not use an index for a HAVING clause!
sql += ' ORDER BY tid DESC LIMIT 1'
r = self.query(sql)
if r:
(serial, data_id, value_serial), = r
if value_serial is None and data_id:
return serial, serial
return serial, value_serial
return None, None
def finishTransaction(self, tid):
q = self.query
tid = util.u64(tid)
sql = " FROM tobj WHERE tid=%d" % tid
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("INSERT INTO obj SELECT *" + sql)
q("DELETE FROM tobj WHERE tid=%d" % tid)
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
q("DELETE FROM ttrans WHERE tid=%d" % tid)
self.unlockData(data_id_list)
self.commit()
def deleteTransaction(self, tid, oid_list=()):
u64 = util.u64
tid = u64(tid)
getPartition = self._getPartition
q = self.query
sql = " FROM tobj WHERE tid=%d" % tid
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
self.unlockData(data_id_list)
q("DELETE" + sql)
q("""DELETE FROM ttrans WHERE tid = %d""" % tid)
q("""DELETE FROM trans WHERE partition = %d AND tid = %d""" %
(getPartition(tid), tid))
# delete from obj using indexes
data_id_set = set()
for oid in oid_list:
oid = u64(oid)
sql = " FROM obj WHERE partition=%d AND oid=%d AND tid=%d" \
% (getPartition(oid), oid, tid)
data_id_set.update(*q("SELECT data_id" + sql))
q("DELETE" + sql)
data_id_set.discard(None)
self._pruneData(data_id_set)
def deleteObject(self, oid, serial=None):
u64 = util.u64
oid = u64(oid)
sql = " FROM obj WHERE partition=%d AND oid=%d" \
% (self._getPartition(oid), oid)
if serial:
sql += ' AND tid=%d' % u64(serial)
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
q("DELETE" + sql)
self._pruneData(data_id_list)
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE partition=%d" % partition
if min_tid:
sql += " AND %d < tid" % util.u64(min_tid)
if max_tid:
sql += " AND tid <= %d" % util.u64(max_tid)
q = self.query
q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
q("DELETE" + sql)
self._pruneData(data_id_list)
def getTransaction(self, tid, all = False):
tid = util.u64(tid)
q = self.query
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM trans WHERE partition = %d AND tid = %d"
% (self._getPartition(tid), tid))
if not r and all:
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM ttrans WHERE tid = %d" % tid)
if r:
oids, user, desc, ext, packed, ttid = r[0]
oid_list = splitOIDField(tid, oids)
return oid_list, user, desc, ext, bool(packed), util.p64(ttid)
def _getObjectLength(self, oid, value_serial):
if value_serial is None:
raise CreationUndone
r = self.query("""SELECT LENGTH(value), value_tid
FROM obj LEFT JOIN data ON (obj.data_id = data.id)
WHERE partition = %d AND oid = %d AND tid = %d""" %
(self._getPartition(oid), oid, value_serial))
length, value_serial = r[0]
if length is None:
logging.info("Multiple levels of indirection when "
"searching for object data for oid %d at tid %d."
" This causes suboptimal performance.", oid, value_serial)
length = self._getObjectLength(oid, value_serial)
return length
def getObjectHistory(self, oid, offset = 0, length = 1):
# FIXME: This method doesn't take client's current ransaction id as
# parameter, which means it can return transactions in the future of
# client's transaction.
oid = util.u64(oid)
p64 = util.p64
pack_tid = self._getPackTID()
r = self.query("""SELECT tid, LENGTH(value), value_tid
FROM obj LEFT JOIN data ON (obj.data_id = data.id)
WHERE partition = %d AND oid = %d AND tid >= %d
ORDER BY tid DESC LIMIT %d, %d""" \
% (self._getPartition(oid), oid, pack_tid, offset, length))
if r:
result = []
append = result.append
for serial, length, value_serial in r:
if length is None:
try:
length = self._getObjectLength(oid, value_serial)
except CreationUndone:
length = 0
append((p64(serial), length))
return result
return None
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
r = self.query('SELECT tid, oid FROM obj'
' WHERE partition = %d AND tid <= %d'
' AND (tid = %d AND %d <= oid OR %d < tid)'
' ORDER BY tid ASC, oid ASC LIMIT %d' % (
partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length))
return [(p64(serial), p64(oid)) for serial, oid in r]
def getTIDList(self, offset, length, partition_list):
q = self.query
r = q("""SELECT tid FROM trans WHERE partition in (%s)
ORDER BY tid DESC LIMIT %d,%d""" \
% (','.join(map(str, partition_list)), offset, length))
return [util.p64(t[0]) for t in r]
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
max_tid = u64(max_tid)
r = self.query("""SELECT tid FROM trans
WHERE partition = %(partition)d
AND tid >= %(min_tid)d AND tid <= %(max_tid)d
ORDER BY tid ASC LIMIT %(length)d""" % {
'partition': partition,
'min_tid': min_tid,
'max_tid': max_tid,
'length': length,
})
return [p64(t[0]) for t in r]
def _updatePackFuture(self, oid, orig_serial, max_serial):
q = self.query
# Before deleting this objects revision, see if there is any
# transaction referencing its value at max_serial or above.
# If there is, copy value to the first future transaction. Any further
# reference is just updated to point to the new data location.
value_serial = None
kw = {
'partition': self._getPartition(oid),
'oid': oid,
'orig_tid': orig_serial,
'max_tid': max_serial,
'new_tid': 'NULL',
}
for kw['table'] in 'obj', 'tobj':
for kw['tid'], in q('SELECT tid FROM %(table)s'
' WHERE partition=%(partition)d AND oid=%(oid)d'
' AND tid>=%(max_tid)d AND value_tid=%(orig_tid)d'
' ORDER BY tid ASC' % kw):
q('UPDATE %(table)s SET value_tid=%(new_tid)s'
' WHERE partition=%(partition)d AND oid=%(oid)d'
' AND tid=%(tid)d' % kw)
if value_serial is None:
# First found, mark its serial for future reference.
kw['new_tid'] = value_serial = kw['tid']
return value_serial
def pack(self, tid, updateObjectDataForPack):
# TODO: unit test (along with updatePackFuture)
p64 = util.p64
tid = util.u64(tid)
updatePackFuture = self._updatePackFuture
getPartition = self._getPartition
q = self.query
self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid, MAX(tid)"
" FROM obj WHERE tid <= %d GROUP BY oid"
% tid):
partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE partition = %d"
" AND oid = %d AND tid = %d AND data_id IS NULL"
% (partition, oid, max_serial)):
max_serial += 1
elif not count:
continue
# There are things to delete for this object
data_id_set = set()
sql = ' FROM obj WHERE partition=%d AND oid=%d' \
' AND tid<%d' % (partition, oid, max_serial)
for serial, data_id in q('SELECT tid, data_id' + sql):
data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial:
new_serial = p64(new_serial)
updateObjectDataForPack(p64(oid), p64(serial),
new_serial, data_id)
q('DELETE' + sql)
data_id_set.discard(None)
self._pruneData(data_id_set)
self.commit()
def checkTIDRange(self, partition, length, min_tid, max_tid):
count, tid_checksum, max_tid = self.query(
"""SELECT COUNT(*), SHA1(GROUP_CONCAT(tid SEPARATOR ",")), MAX(tid)
FROM (SELECT tid FROM trans
WHERE partition = %(partition)s
AND tid >= %(min_tid)d
AND tid <= %(max_tid)d
ORDER BY tid ASC %(limit)s) AS t""" % {
'partition': partition,
'min_tid': util.u64(min_tid),
'max_tid': util.u64(max_tid),
'limit': '' if length is None else 'LIMIT %u' % length,
})[0]
if count:
return count, a2b_hex(tid_checksum), util.p64(max_tid)
return 0, ZERO_HASH, ZERO_TID
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
u64 = util.u64
# We don't ask MySQL to compute everything (like in checkTIDRange)
# because it's difficult to get the last serial _for the last oid_.
# We would need a function (that could be named 'LAST') that returns the
# last grouped value, instead of the greatest one.
r = self.query(
"""SELECT tid, oid
FROM obj
WHERE partition = %(partition)s
AND tid <= %(max_tid)d
AND (tid > %(min_tid)d OR
tid = %(min_tid)d AND oid >= %(min_oid)d)
ORDER BY tid, oid %(limit)s""" % {
'min_oid': u64(min_oid),
'min_tid': u64(min_tid),
'max_tid': u64(max_tid),
'limit': '' if length is None else 'LIMIT %u' % length,
'partition': partition,
})
if r:
p64 = util.p64
return (len(r),
sha1(','.join(str(x[0]) for x in r)).digest(),
p64(r[-1][0]),
sha1(','.join(str(x[1]) for x in r)).digest(),
p64(r[-1][1]))
return 0, ZERO_HASH, ZERO_TID, ZERO_HASH, ZERO_OID
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/database/sqlite.py 0000664 0000000 0000000 00000061666 12014530641 0030341 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sqlite3
from hashlib import sha1
import string
import traceback
from . import DatabaseManager, LOG_QUERIES
from .manager import CreationUndone
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
def splitOIDField(tid, oids):
if (len(oids) % 8) != 0 or len(oids) == 0:
raise DatabaseFailure('invalid oids length for tid %d: %d' % (tid,
len(oids)))
oid_list = []
append = oid_list.append
for i in xrange(0, len(oids), 8):
append(oids[i:i+8])
return oid_list
def retry_if_locked(f, *args):
try:
return f(*args)
except sqlite3.OperationalError, e:
x = e.args[0]
if x == 'database is locked':
msg = traceback.format_exception_only(type(e), e)
msg += traceback.format_stack()
logging.warning(''.join(msg))
while e.args[0] == x:
try:
return f(*args)
except sqlite3.OperationalError, e:
pass
raise
class SQLiteDatabaseManager(DatabaseManager):
"""This class manages a database on SQLite.
CAUTION: Make sure we never use statement journal files, as explained at
http://www.sqlite.org/tempfiles.html for more information.
In other words, temporary files (by default in /var/tmp !) must
never be used for small requests.
"""
def __init__(self, *args, **kw):
super(SQLiteDatabaseManager, self).__init__(*args, **kw)
self._config = {}
self._connect()
def _parse(self, database):
self.db = database
def close(self):
self.conn.close()
def _connect(self):
logging.info('connecting to SQLite database %r', self.db)
self.conn = sqlite3.connect(self.db, check_same_thread=False)
def commit(self):
logging.debug('committing...')
retry_if_locked(self.conn.commit)
if LOG_QUERIES:
def query(self, query):
printable_char_list = []
for c in query.split('\n', 1)[0][:70]:
if c not in string.printable or c in '\t\x0b\x0c\r':
c = '\\x%02x' % ord(c)
printable_char_list.append(c)
logging.debug('querying %s...', ''.join(printable_char_list))
return self.conn.execute(query)
else:
query = property(lambda self: self.conn.execute)
def setup(self, reset = 0):
self._config.clear()
q = self.query
if reset:
for t in 'config', 'pt', 'trans', 'obj', 'data', 'ttrans', 'tobj':
q('DROP TABLE IF EXISTS ' + t)
# The table "config" stores configuration parameters which affect the
# persistent data.
q("""CREATE TABLE IF NOT EXISTS config (
name TEXT NOT NULL PRIMARY KEY,
value TEXT)
""")
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
rid INTEGER NOT NULL,
nid INTEGER NOT NULL,
state INTEGER NOT NULL,
PRIMARY KEY (rid, nid))
""")
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
partition INTEGER NOT NULL,
tid INTEGER NOT NULL,
packed BOOLEAN NOT NULL,
oids BLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid))
""")
# The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (partition, tid, oid))
""")
q("""CREATE INDEX IF NOT EXISTS _obj_i1 ON
obj(partition, oid, tid)
""")
q("""CREATE INDEX IF NOT EXISTS _obj_i2 ON
obj(data_id)
""")
# The table "data" stores object data.
q("""CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash BLOB NOT NULL UNIQUE,
compression INTEGER,
value BLOB)
""")
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
partition INTEGER NOT NULL,
tid INTEGER NOT NULL,
packed BOOLEAN NOT NULL,
oids BLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL)
""")
# The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (tid, oid))
""")
self._uncommitted_data = dict(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
def getConfiguration(self, key):
try:
return self._config[key]
except KeyError:
try:
r = self.query("SELECT value FROM config WHERE name=?",
(key,)).fetchone()[0]
except TypeError:
r = None
self._config[key] = r
return r
def _setConfiguration(self, key, value):
q = self.query
self._config[key] = value
if value is None:
q("DELETE FROM config WHERE name=?", (key,))
else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def _setPackTID(self, tid):
self._setConfiguration('_pack_tid', tid)
def _getPackTID(self):
try:
return int(self.getConfiguration('_pack_tid'))
except TypeError:
return -1
def getPartitionTable(self):
return self.query("SELECT * FROM pt")
def _getLastIDs(self, all=True):
p64 = util.p64
q = self.query
trans = dict((partition, p64(tid))
for partition, tid in q("SELECT partition, MAX(tid)"
" FROM trans GROUP BY partition"))
obj = dict((partition, p64(tid))
for partition, tid in q("SELECT partition, MAX(tid)"
" FROM obj GROUP BY partition"))
oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj"
" GROUP BY partition) as t").next()[0]
if all:
tid = q("SELECT MAX(tid) FROM ttrans").next()[0]
if tid is not None:
trans[None] = p64(tid)
tid, toid = q("SELECT MAX(tid), MAX(oid) FROM tobj").next()
if tid is not None:
obj[None] = p64(tid)
if toid is not None and (oid < toid or oid is None):
oid = toid
return trans, obj, None if oid is None else p64(oid)
def getUnfinishedTIDList(self):
p64 = util.p64
return [p64(t[0]) for t in self.query("SELECT tid FROM ttrans"
" UNION SELECT tid FROM tobj")]
def objectPresent(self, oid, tid, all=True):
oid = util.u64(oid)
tid = util.u64(tid)
q = self.query
return q("SELECT 1 FROM obj WHERE partition=? AND oid=? AND tid=?",
(self._getPartition(oid), oid, tid)).fetchone() or all and \
q("SELECT 1 FROM tobj WHERE tid=? AND oid=?",
(tid, oid)).fetchone()
def _getObject(self, oid, tid=None, before_tid=None):
q = self.query
partition = self._getPartition(oid)
sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON obj.data_id = data.id'
' WHERE partition=? AND oid=?')
if tid is not None:
r = q(sql + ' AND tid=?', (partition, oid, tid))
elif before_tid is not None:
r = q(sql + ' AND tid ORDER BY tid DESC LIMIT 1',
(partition, oid, before_tid))
else:
r = q(sql + ' ORDER BY tid DESC LIMIT 1', (partition, oid))
try:
serial, compression, checksum, data, value_serial = r.fetchone()
except TypeError:
return None
r = q("""SELECT tid FROM obj
WHERE partition=? AND oid=? AND tid>?
ORDER BY tid LIMIT 1""",
(partition, oid, serial)).fetchone()
if checksum:
checksum = str(checksum)
data = str(data)
return serial, r and r[0], compression, checksum, data, value_serial
def doSetPartitionTable(self, ptid, cell_list, reset):
q = self.query
if reset:
q("DELETE FROM pt")
for offset, nid, state in cell_list:
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
# WKRD: Why does SQLite need a statement journal file
# whereas we try to replace only 1 value ?
# We don't want to remove the 'NOT NULL' constraint
# so we must simulate a "REPLACE OR FAIL".
q("DELETE FROM pt WHERE rid=? AND nid=?", (offset, nid))
if state != CellStates.DISCARDED:
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
self.setPTID(ptid)
def changePartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, False)
def setPartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, True)
def dropPartitions(self, offset_list):
where = " WHERE partition=?"
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj" + where, args) if x]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
def dropUnfinishedData(self):
q = self.query
data_id_list = [x for x, in q("SELECT data_id FROM tobj") if x]
q("DELETE FROM tobj")
q("DELETE FROM ttrans")
self.unlockData(data_id_list, True)
def storeTransaction(self, tid, object_list, transaction, temporary=True):
u64 = util.u64
tid = u64(tid)
T = 't' if temporary else ''
obj_sql = "INSERT OR FAIL INTO %sobj VALUES (?,?,?,?,?)" % T
q = self.query
for oid, data_id, value_serial in object_list:
oid = u64(oid)
partition = self._getPartition(oid)
if value_serial:
value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=? AND oid=? AND tid=?",
(partition, oid, value_serial))
if temporary:
self.storeData(data_id)
try:
q(obj_sql, (partition, oid, tid, data_id, value_serial))
except sqlite3.IntegrityError:
# This may happen if a previous replication of 'obj' was
# interrupted.
if not T:
r, = q("SELECT data_id, value_tid FROM obj"
" WHERE partition=? AND oid=? AND tid=?",
(partition, oid, tid))
if r == (data_id, value_serial):
continue
raise
if transaction:
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
assert packed in (0, 1)
q("INSERT OR FAIL INTO %strans VALUES (?,?,?,?,?,?,?,?)" % T,
(partition, tid, packed, buffer(''.join(oid_list)),
buffer(user), buffer(desc), buffer(ext), u64(ttid)))
if temporary:
self.commit()
def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list:
q = self.query
data_id_list.difference_update(x for x, in q(
"SELECT DISTINCT data_id FROM obj WHERE data_id IN (%s)"
% ",".join(map(str, data_id_list))))
q("DELETE FROM data WHERE id IN (%s)"
% ",".join(map(str, data_id_list)))
def _storeData(self, checksum, data, compression):
H = buffer(checksum)
try:
return self.query("INSERT INTO data VALUES (NULL,?,?,?)",
(H, compression, buffer(data))).lastrowid
except sqlite3.IntegrityError, e:
if e.args[0] == 'column hash is not unique':
(r, c, d), = self.query("SELECT id, compression, value"
" FROM data WHERE hash=?", (H,))
if c == compression and str(d) == data:
return r
raise
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getPartition(oid)
sql = 'SELECT tid, data_id, value_tid FROM obj' \
' WHERE partition=? AND oid=?'
if tid is not None:
r = self.query(sql + ' AND tid=?', (partition, oid, tid))
elif before_tid is not None:
r = self.query(sql + ' AND tid ORDER BY tid DESC LIMIT 1',
(partition, oid, before_tid))
else:
r = self.query(sql + ' ORDER BY tid DESC LIMIT 1',
(partition, oid))
r = r.fetchone()
if r:
serial, data_id, value_serial = r
if value_serial is None and data_id:
return serial, serial
return serial, value_serial
return None, None
def finishTransaction(self, tid):
args = util.u64(tid),
q = self.query
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, args) if x]
q("INSERT OR FAIL INTO obj SELECT *" + sql, args)
q("DELETE FROM tobj WHERE tid=?", args)
q("INSERT OR FAIL INTO trans SELECT * FROM ttrans WHERE tid=?", args)
q("DELETE FROM ttrans WHERE tid=?", args)
self.unlockData(data_id_list)
self.commit()
def deleteTransaction(self, tid, oid_list=()):
u64 = util.u64
tid = u64(tid)
getPartition = self._getPartition
q = self.query
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, (tid,)) if x]
self.unlockData(data_id_list)
q("DELETE" + sql, (tid,))
q("DELETE FROM ttrans WHERE tid=?", (tid,))
q("DELETE FROM trans WHERE partition=? AND tid=?",
(getPartition(tid), tid))
# delete from obj using indexes
data_id_set = set()
for oid in oid_list:
oid = u64(oid)
sql = " FROM obj WHERE partition=? AND oid=? AND tid=?"
args = getPartition(oid), oid, tid
data_id_set.update(*q("SELECT data_id" + sql, args))
q("DELETE" + sql, args)
data_id_set.discard(None)
self._pruneData(data_id_set)
def deleteObject(self, oid, serial=None):
oid = util.u64(oid)
sql = " FROM obj WHERE partition=? AND oid=?"
args = [self._getPartition(oid), oid]
if serial:
sql += " AND tid=?"
args.append(util.u64(serial))
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE partition=?"
args = [partition]
if min_tid:
sql += " AND ? < tid"
args.append(util.u64(min_tid))
if max_tid:
sql += " AND tid <= ?"
args.append(util.u64(max_tid))
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
def getTransaction(self, tid, all=False):
tid = util.u64(tid)
q = self.query
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM trans WHERE partition=? AND tid=?",
(self._getPartition(tid), tid)).fetchone()
if not r and all:
r = q("SELECT oids, user, description, ext, packed, ttid"
" FROM ttrans WHERE tid=?", (tid,)).fetchone()
if r:
oids, user, description, ext, packed, ttid = r
return splitOIDField(tid, oids), str(user), \
str(description), str(ext), packed, util.p64(ttid)
def _getObjectLength(self, oid, value_serial):
if value_serial is None:
raise CreationUndone
length, value_serial = self.query("""SELECT LENGTH(value), value_tid
FROM obj LEFT JOIN data ON obj.data_id=data.id
WHERE partition=? AND oid=? AND tid=?""",
(self._getPartition(oid), oid, value_serial)).fetchone()
if length is None:
logging.info("Multiple levels of indirection"
" when searching for object data for oid %d at tid %d."
" This causes suboptimal performance.", oid, value_serial)
length = self._getObjectLength(oid, value_serial)
return length
def getObjectHistory(self, oid, offset=0, length=1):
# FIXME: This method doesn't take client's current transaction id as
# parameter, which means it can return transactions in the future of
# client's transaction.
p64 = util.p64
oid = util.u64(oid)
pack_tid = self._getPackTID()
result = []
append = result.append
for serial, length, value_serial in self.query("""\
SELECT tid, LENGTH(value), value_tid
FROM obj LEFT JOIN data ON obj.data_id = data.id
WHERE partition=? AND oid=? AND tid>=?
ORDER BY tid DESC LIMIT ?,?""",
(self._getPartition(oid), oid, pack_tid, offset, length)):
if length is None:
try:
length = self._getObjectLength(oid, value_serial)
except CreationUndone:
length = 0
append((p64(serial), length))
return result or None
def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
return [(p64(serial), p64(oid)) for serial, oid in self.query("""\
SELECT tid, oid FROM obj
WHERE partition=? AND tid<=?
AND (tid=? AND ?<=oid OR ?=? AND value_tid=?
ORDER BY tid ASC""" % T,
(partition, oid, max_serial, orig_serial)):
q(update, (value_serial, partition, oid, serial))
if value_serial is None:
# First found, mark its serial for future reference.
value_serial = serial
return value_serial
def pack(self, tid, updateObjectDataForPack):
# TODO: unit test (along with updatePackFuture)
p64 = util.p64
tid = util.u64(tid)
updatePackFuture = self._updatePackFuture
getPartition = self._getPartition
q = self.query
self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid, MAX(tid)"
" FROM obj WHERE tid<=? GROUP BY oid",
(tid,)):
partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE partition=?"
" AND oid=? AND tid=? AND data_id IS NULL",
(partition, oid, max_serial)).fetchone():
max_serial += 1
elif not count:
continue
# There are things to delete for this object
data_id_set = set()
sql = " FROM obj WHERE partition=? AND oid=? AND tid"
args = partition, oid, max_serial
for serial, data_id in q("SELECT tid, data_id" + sql, args):
data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial:
new_serial = p64(new_serial)
updateObjectDataForPack(p64(oid), p64(serial),
new_serial, data_id)
q("DELETE" + sql, args)
data_id_set.discard(None)
self._pruneData(data_id_set)
self.commit()
def checkTIDRange(self, partition, length, min_tid, max_tid):
# XXX: SQLite's GROUP_CONCAT is slow (looks like quadratic)
count, tids, max_tid = self.query("""\
SELECT COUNT(*), GROUP_CONCAT(tid), MAX(tid)
FROM (SELECT tid FROM trans
WHERE partition=? AND ?<=tid AND tid<=?
ORDER BY tid ASC LIMIT ?) AS t""",
(partition, util.u64(min_tid), util.u64(max_tid),
-1 if length is None else length)).fetchone()
if count:
return count, sha1(tids).digest(), util.p64(max_tid)
return 0, ZERO_HASH, ZERO_TID
def checkSerialRange(self, partition, length, min_tid, max_tid, min_oid):
u64 = util.u64
# We don't ask SQLite to compute everything (like in checkTIDRange)
# because it's difficult to get the last serial _for the last oid_.
# We would need a function (that could be named 'LAST') that returns the
# last grouped value, instead of the greatest one.
min_tid = u64(min_tid)
r = self.query("""\
SELECT tid, oid
FROM obj
WHERE partition=? AND tid<=? AND (tid>? OR tid=? AND oid>=?)
ORDER BY tid, oid LIMIT ?""",
(partition, u64(max_tid), min_tid, min_tid, u64(min_oid),
-1 if length is None else length)).fetchall()
if r:
p64 = util.p64
return (len(r),
sha1(','.join(str(x[0]) for x in r)).digest(),
p64(r[-1][0]),
sha1(','.join(str(x[1]) for x in r)).digest(),
p64(r[-1][1]))
return 0, ZERO_HASH, ZERO_TID, ZERO_HASH, ZERO_OID
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/exception.py 0000664 0000000 0000000 00000001317 12014530641 0027255 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
class AlreadyPendingError(Exception):
pass
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/ 0000775 0000000 0000000 00000000000 12014530641 0026503 5 ustar 00root root 0000000 0000000 neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/__init__.py 0000664 0000000 0000000 00000004643 12014530641 0030623 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, OperationFailure
from neo.lib.protocol import uuid_str, NodeStates, NodeTypes
class BaseMasterHandler(EventHandler):
def connectionLost(self, conn, new_state):
if self.app.listening_conn: # if running
self.app.master_node = None
raise PrimaryFailure('connection lost')
def stopOperation(self, conn):
raise OperationFailure('operation stopped')
def reelectPrimary(self, conn):
raise PrimaryFailure('re-election occurs')
def notifyClusterInformation(self, conn, state):
self.app.changeClusterState(state)
def notifyNodeInformation(self, conn, node_list):
"""Store information on nodes, only if this is sent by a primary
master node."""
self.app.nm.update(node_list)
for node_type, addr, uuid, state in node_list:
if uuid == self.app.uuid:
# This is me, do what the master tell me
logging.info("I was told I'm %s", state)
if state in (NodeStates.DOWN, NodeStates.TEMPORARILY_DOWN,
NodeStates.BROKEN, NodeStates.UNKNOWN):
erase = state == NodeStates.DOWN
self.app.shutdown(erase=erase)
elif state == NodeStates.HIDDEN:
raise OperationFailure
elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING:
logging.info('Notified of non-running client, abort (%s)',
uuid_str(uuid))
self.app.tm.abortFor(uuid)
def answerUnfinishedTransactions(self, conn, *args, **kw):
self.app.replicator.setUnfinishedTIDList(*args, **kw)
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/client.py 0000664 0000000 0000000 00000022067 12014530641 0030342 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError
from ..exception import AlreadyPendingError
import time
# Log stores taking (incl. lock delays) more than this many seconds.
# Set to None to disable.
SLOW_STORE = 2
class ClientOperationHandler(EventHandler):
def askTransactionInformation(self, conn, tid):
t = self.app.dm.getTransaction(tid)
if t is None:
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
conn.answer(p)
def askObject(self, conn, oid, serial, tid):
app = self.app
if app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObject, conn, (oid, serial, tid))
return
o = app.dm.getObject(oid, serial, tid)
if o is None:
logging.debug('oid = %s does not exist', dump(oid))
p = Errors.OidDoesNotExist(dump(oid))
elif o is False:
logging.debug('oid = %s not found', dump(oid))
p = Errors.OidNotFound(dump(oid))
else:
serial, next_serial, compression, checksum, data, data_serial = o
logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial))
if checksum is None:
checksum = ZERO_HASH
data = ''
p = Packets.AnswerObject(oid, serial, next_serial,
compression, checksum, data, data_serial)
conn.answer(p)
def connectionLost(self, conn, new_state):
uuid = conn.getUUID()
node = self.app.nm.getByUUID(uuid)
if self.app.listening_conn: # if running
assert node is not None, conn
self.app.nm.remove(node)
def abortTransaction(self, conn, ttid):
self.app.tm.abort(ttid)
def askStoreTransaction(self, conn, ttid, user, desc, ext, oid_list):
self.app.tm.register(conn.getUUID(), ttid)
self.app.tm.storeTransaction(ttid, oid_list, user, desc, ext, False)
conn.answer(Packets.AnswerStoreTransaction(ttid))
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, request_time):
if ttid not in self.app.tm:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial, unlock)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(1, oid, err.getTID()))
except DelayedError:
# locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise
# AlreadyPendingError, to avoid making lcient wait for an unneeded
# response.
try:
self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid,
unlock, request_time), key=(oid, ttid),
raise_on_duplicate=unlock)
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid, unlock):
# register the transaction
self.app.tm.register(conn.getUUID(), ttid)
if data or checksum != ZERO_HASH:
# TODO: return an appropriate error packet
assert makeChecksum(data) == checksum
assert data_serial is None
else:
checksum = data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, time.time())
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition)))
def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise ProtocolError('invalid offsets')
app = self.app
if partition == INVALID_PARTITION:
partition_list = app.pt.getAssignedPartitionList(app.uuid)
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first, partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
app = self.app
findUndoTID = app.dm.findUndoTID
getObjectFromTransaction = app.tm.getObjectFromTransaction
object_tid_dict = {}
for oid in oid_list:
current_serial, undo_serial, is_current = findUndoTID(oid, ttid,
ltid, undone_tid, getObjectFromTransaction(ttid, oid))
if current_serial is None:
p = Errors.OidNotFound(dump(oid))
break
object_tid_dict[oid] = (current_serial, undo_serial, is_current)
else:
p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p)
def askHasLock(self, conn, ttid, oid):
locking_tid = self.app.tm.getLockingTID(oid)
logging.info('%r check lock of %r:%r', conn, dump(ttid), dump(oid))
if locking_tid is None:
state = LockState.NOT_LOCKED
elif locking_tid is ttid:
state = LockState.GRANTED
else:
state = LockState.GRANTED_TO_OTHER
conn.answer(Packets.AnswerHasLock(oid, state))
def askObjectHistory(self, conn, oid, first, last):
if first >= last:
raise ProtocolError('invalid offsets')
app = self.app
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
p = Errors.OidNotFound(dump(oid))
else:
p = Packets.AnswerObjectHistory(oid, history_list)
conn.answer(p)
def askCheckCurrentSerial(self, conn, ttid, serial, oid):
self._askCheckCurrentSerial(conn, ttid, serial, oid, time.time())
def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
if ttid not in self.app.tm:
# transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid,
err.getTID()))
except DelayedError:
# locked by a previous transaction, retry later
try:
self.app.queueEvent(self._askCheckCurrentSerial, conn, (ttid,
serial, oid, request_time), key=(oid, ttid))
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/hidden.py 0000664 0000000 0000000 00000003620 12014530641 0030311 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from . import BaseMasterHandler
from neo.lib import logging
from neo.lib.protocol import CellStates
class HiddenHandler(BaseMasterHandler):
"""This class implements a generic part of the event handlers."""
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid <= app.pt.getID():
# Ignore this packet.
logging.debug('ignoring older partition changes')
return
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
# Check changes for replications
for offset, uuid, state in cell_list:
if uuid == app.uuid and app.replicator is not None:
# If this is for myself, this can affect replications.
if state == CellStates.DISCARDED:
app.replicator.removePartition(offset)
elif state == CellStates.OUT_OF_DATE:
app.replicator.addPartition(offset)
def startOperation(self, conn):
self.app.operational = True
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/identification.py 0000664 0000000 0000000 00000006470 12014530641 0032055 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from .storage import StorageOperationHandler
from .client import ClientOperationHandler
class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """
def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification')
def requestIdentification(self, conn, node_type,
uuid, address, name):
self.checkClusterName(name)
# reject any incoming connections if not ready
if not self.app.ready:
raise NotReadyError
app = self.app
if uuid is None:
if node_type != NodeTypes.STORAGE:
raise ProtocolError('reject anonymous non-storage node')
handler = StorageOperationHandler(self.app)
conn.setHandler(handler)
else:
if uuid == app.uuid:
raise ProtocolError("uuid conflict or loopback connection")
node = app.nm.getByUUID(uuid)
# If this node is broken, reject it.
if node is not None and node.isBroken():
raise BrokenNodeDisallowedError
# choose the handler according to the node type
if node_type == NodeTypes.CLIENT:
handler = ClientOperationHandler
if node is None:
node = app.nm.createClient(uuid=uuid)
elif node.isConnected():
# cut previous connection
node.getConnection().close()
assert not node.isConnected()
node.setRunning()
elif node_type == NodeTypes.STORAGE:
if node is None:
logging.error('reject an unknown storage node %s',
uuid_str(uuid))
raise NotReadyError
handler = StorageOperationHandler
else:
raise ProtocolError('reject non-client-or-storage node')
# apply the handler and set up the connection
handler = handler(self.app)
conn.setHandler(handler)
node.setConnection(conn, app.uuid < uuid)
# accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid,
app.master_node.getAddress(), ()))
handler.connectionCompleted(conn)
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/initialization.py 0000664 0000000 0000000 00000005030 12014530641 0032102 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from . import BaseMasterHandler
from neo.lib import logging, protocol
class InitializationHandler(BaseMasterHandler):
def answerNodeInformation(self, conn):
pass
def answerPartitionTable(self, conn, ptid, row_list):
app = self.app
pt = app.pt
pt.load(ptid, row_list, self.app.nm)
if not pt.filled():
raise protocol.ProtocolError('Partial partition table received')
logging.debug('Got the partition table:')
self.app.pt.log()
# Install the partition table into the database for persistency.
cell_list = []
num_partitions = app.pt.getPartitions()
unassigned_set = set(xrange(num_partitions))
for offset in xrange(num_partitions):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned_set.remove(offset)
# delete objects database
if unassigned_set:
logging.debug('drop data for partitions %r', unassigned_set)
app.dm.dropPartitions(unassigned_set)
app.dm.setPartitionTable(ptid, cell_list)
def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
self.app.dm.setBackupTID(backup_tid)
def notifyPartitionChanges(self, conn, ptid, cell_list):
# XXX: This is safe to ignore those notifications because all of the
# following applies:
# - we first ask for node information, and *then* partition
# table content, so it is possible to get notifyPartitionChanges
# packets in between (or even before asking for node information).
# - this handler will be changed after receiving answerPartitionTable
# and before handling the next packet
logging.debug('ignoring notifyPartitionChanges during initialization')
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/master.py 0000664 0000000 0000000 00000005507 12014530641 0030357 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError
from . import BaseMasterHandler
class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """
def notifyTransactionFinished(self, conn, *args, **kw):
self.app.replicator.transactionFinished(*args, **kw)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid <= app.pt.getID():
# Ignore this packet.
logging.debug('ignoring older partition changes')
return
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
# Check changes for replications
app.replicator.notifyPartitionChanges(cell_list)
def askLockInformation(self, conn, ttid, tid, oid_list):
if not ttid in self.app.tm:
raise ProtocolError('Unknown transaction')
self.app.tm.lock(ttid, tid, oid_list)
if not conn.isClosed():
conn.answer(Packets.AnswerInformationLocked(ttid))
def notifyUnlockInformation(self, conn, ttid):
if not ttid in self.app.tm:
raise ProtocolError('Unknown transaction')
# TODO: send an answer
self.app.tm.unlock(ttid)
def askPack(self, conn, tid):
app = self.app
logging.info('Pack started, up to %s...', dump(tid))
app.dm.pack(tid, app.tm.updateObjectDataForPack)
logging.info('Pack finished.')
if not conn.isClosed():
conn.answer(Packets.AnswerPack(True))
def replicate(self, conn, tid, upstream_name, source_dict):
self.app.replicator.backup(tid,
dict((p, a and (a, upstream_name))
for p, a in source_dict.iteritems()))
def truncate(self, conn, tid):
self.app.replicator.cancel()
self.app.dm.truncate(tid)
conn.close()
def checkPartition(self, conn, *args):
self.app.checker(*args)
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/storage.py 0000664 0000000 0000000 00000022761 12014530641 0030531 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import weakref
from functools import wraps
from neo.lib.connector import ConnectorConnectionClosedException
from neo.lib.handler import EventHandler
from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \
ZERO_HASH
def checkConnectionIsReplicatorConnection(func):
def wrapper(self, conn, *args, **kw):
assert self.app.replicator.getCurrentConnection() is conn
return func(self, conn, *args, **kw)
return wraps(func)(wrapper)
def checkFeedingConnection(check):
def decorator(func):
def wrapper(self, conn, partition, *args, **kw):
app = self.app
cell = app.pt.getCell(partition, app.uuid)
if cell is None or (cell.isOutOfDate() if check else
not cell.isReadable()):
p = Errors.CheckingError if check else Errors.ReplicationError
return conn.answer(p("partition %u not readable" % partition))
conn.asServer()
return func(self, conn, partition, *args, **kw)
return wraps(func)(wrapper)
return decorator
class StorageOperationHandler(EventHandler):
"""This class handles events for replications."""
def connectionLost(self, conn, new_state):
app = self.app
if app.listening_conn and conn.isClient():
# XXX: Connection and Node should merged.
uuid = conn.getUUID()
if uuid:
node = app.nm.getByUUID(uuid)
else:
node = app.nm.getByAddress(conn.getAddress())
node.setState(NodeStates.DOWN)
replicator = app.replicator
if replicator.current_node is node:
replicator.abort()
app.checker.connectionLost(conn)
# Client
def connectionFailed(self, conn):
if self.app.listening_conn:
self.app.replicator.abort()
def _acceptIdentification(self, node, *args):
self.app.replicator.connected(node)
self.app.checker.connected(node)
@checkConnectionIsReplicatorConnection
def answerFetchTransactions(self, conn, pack_tid, next_tid, tid_list):
if tid_list:
deleteTransaction = self.app.dm.deleteTransaction
for tid in tid_list:
deleteTransaction(tid)
assert not pack_tid, "TODO"
if next_tid:
self.app.replicator.fetchTransactions(next_tid)
else:
self.app.replicator.fetchObjects()
@checkConnectionIsReplicatorConnection
def addTransaction(self, conn, tid, user, desc, ext, packed, ttid,
oid_list):
# Directly store the transaction.
self.app.dm.storeTransaction(tid, (),
(oid_list, user, desc, ext, packed, ttid), False)
@checkConnectionIsReplicatorConnection
def answerFetchObjects(self, conn, pack_tid, next_tid,
next_oid, object_dict):
if object_dict:
deleteObject = self.app.dm.deleteObject
for serial, oid_list in object_dict.iteritems():
for oid in oid_list:
deleteObject(oid, serial)
self.app.dm.commit()
assert not pack_tid, "TODO"
if next_tid:
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
self.app.replicator.finish()
@checkConnectionIsReplicatorConnection
def addObject(self, conn, oid, serial, compression,
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, data, compression)
else:
data_id = None
# Directly store the transaction.
obj = oid, data_id, data_serial
dm.storeTransaction(serial, (obj,), None, False)
@checkConnectionIsReplicatorConnection
def replicationError(self, conn, message):
self.app.replicator.abort('source message: ' + message)
def checkingError(self, conn, message):
try:
self.app.checker.connectionLost(conn)
finally:
self.app.closeClient(conn)
@property
def answerCheckTIDRange(self):
return self.app.checker.checkRange
@property
def answerCheckSerialRange(self):
return self.app.checker.checkRange
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
@checkFeedingConnection(check=True)
def askCheckTIDRange(self, conn, *args):
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
def check():
r = self.app.dm.checkTIDRange(*args)
try:
conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass
yield
self.app.newTask(check())
@checkFeedingConnection(check=True)
def askCheckSerialRange(self, conn, *args):
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
def check():
r = self.app.dm.checkSerialRange(*args)
try:
conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass
yield
self.app.newTask(check())
@checkFeedingConnection(check=False)
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list):
app = self.app
if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in
# ttrans/ttobj so wait a little.
app.queueEvent(self.askFetchTransactions, conn,
(partition, length, min_tid, max_tid, tid_list))
return
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
peer_tid_set = set(tid_list)
dm = app.dm
tid_list = dm.getReplicationTIDList(min_tid, max_tid, length + 1,
partition)
next_tid = tid_list.pop() if length < len(tid_list) else None
def push():
try:
pack_tid = None # TODO
for tid in tid_list:
if tid in peer_tid_set:
peer_tid_set.remove(tid)
else:
t = dm.getTransaction(tid)
if t is None:
conn.answer(Errors.ReplicationError(
"partition %u dropped" % partition))
return
oid_list, user, desc, ext, packed, ttid = t
conn.notify(Packets.AddTransaction(
tid, user, desc, ext, packed, ttid, oid_list))
yield
conn.answer(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id)
yield
except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass
app.newTask(push())
@checkFeedingConnection(check=False)
def askFetchObjects(self, conn, partition, length, min_tid, max_tid,
min_oid, object_dict):
app = self.app
if app.tm.isLockedTid(max_tid):
raise ProtocolError("transactions must be fetched before objects")
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
dm = app.dm
object_list = dm.getReplicationObjectList(min_tid, max_tid, length + 1,
partition, min_oid)
if length < len(object_list):
next_tid, next_oid = object_list.pop()
else:
next_tid = next_oid = None
def push():
try:
pack_tid = None # TODO
for serial, oid in object_list:
oid_set = object_dict.get(serial)
if oid_set:
if type(oid_set) is list:
object_dict[serial] = oid_set = set(oid_set)
if oid in oid_set:
oid_set.remove(oid)
if not oid_set:
del object_dict[serial]
continue
object = dm.getObject(oid, serial)
if not object:
conn.answer(Errors.ReplicationError(
"partition %u dropped or truncated" % partition))
return
conn.notify(Packets.AddObject(oid, serial, *object[2:]))
yield
conn.answer(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id)
yield
except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass
app.newTask(push())
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/handlers/verification.py 0000664 0000000 0000000 00000006037 12014530641 0031545 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from . import BaseMasterHandler
from neo.lib import logging
from neo.lib.protocol import Packets, Errors, INVALID_TID
from neo.lib.util import dump
from neo.lib.exception import OperationFailure
class VerificationHandler(BaseMasterHandler):
"""This class deals with events for a verification phase."""
def askLastIDs(self, conn):
app = self.app
ltid, _, _, loid = app.dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(
loid,
ltid,
app.pt.getID(),
app.dm.getBackupTID()))
def askPartitionTable(self, conn):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid <= app.pt.getID():
# Ignore this packet.
logging.debug('ignoring older partition changes')
return
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
def startOperation(self, conn):
self.app.operational = True
def stopOperation(self, conn):
raise OperationFailure('operation stopped')
def askUnfinishedTransactions(self, conn):
tid_list = self.app.dm.getUnfinishedTIDList()
conn.answer(Packets.AnswerUnfinishedTransactions(INVALID_TID, tid_list))
def askTransactionInformation(self, conn, tid):
app = self.app
t = app.dm.getTransaction(tid, all=True)
if t is None:
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
conn.answer(p)
def askObjectPresent(self, conn, oid, tid):
if self.app.dm.objectPresent(oid, tid):
p = Packets.AnswerObjectPresent(oid, tid)
else:
p = Errors.OidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid)))
conn.answer(p)
def deleteTransaction(self, conn, tid, oid_list):
self.app.dm.deleteTransaction(tid, oid_list)
def commitTransaction(self, conn, tid):
self.app.dm.finishTransaction(tid)
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/replicator.py 0000664 0000000 0000000 00000036133 12014530641 0027427 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
Replication algorithm
Purpose: replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:
- A new storage is added to en existing cluster.
- A node was separated from cluster and rejoins it.
- In a backup cluster, the master notifies a node that new data exists upstream
(note that in this case, the cell is always marked as UP_TO_DATE).
Replication happens per partition. Reference node can change between
partitions.
2 parts, done sequentially:
- Transaction (metadata) replication
- Object (data) replication
Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items
(transaction or object).
- For every chunk, the requesting node sends to seeding node the list of items
it already has.
- Before answering, the seeding node sends 1 packet for every missing item.
- The seeding node finally answers with the list of items to delete (usually
empty).
Replication is partial, starting from the greatest stored tid in the partition:
- For transactions, this tid is excluded from replication.
- For objects, this tid is included unless the storage already knows it has
all oids for it.
There is no check that item values on both nodes matches.
TODO: Packing and replication currently fail when then happen at the same time.
"""
import random
from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection
from neo.lib.util import add64, dump
from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000
class Partition(object):
__slots__ = 'next_trans', 'next_obj', 'max_ttid'
def __repr__(self):
return '<%s(%s) at 0x%x>' % (self.__class__.__name__,
', '.join('%s=%r' % (x, getattr(self, x)) for x in self.__slots__
if hasattr(self, x)),
id(self))
class Replicator(object):
current_node = None
current_partition = None
def __init__(self, app):
self.app = app
def getCurrentConnection(self):
node = self.current_node
if node is not None and node.isConnected():
return node.getConnection()
# XXX: We can't replicate unfinished transactions but do we need such
# complex code ? Backup mechanism does not rely on this: instead
# the upstream storage delays the answer. Maybe we can do the same
# for internal replication.
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler."""
if ttid_list:
self.ttid_set.update(ttid_list)
max_ttid = max(ttid_list)
else:
max_ttid = None
for offset in offset_list:
self.partition_dict[offset].max_ttid = max_ttid
self.replicate_dict[offset] = max_tid
self._nextPartition()
def transactionFinished(self, ttid, max_tid):
""" Callback from MasterOperationHandler """
self.ttid_set.remove(ttid)
min_ttid = min(self.ttid_set) if self.ttid_set else INVALID_TID
for offset, p in self.partition_dict.iteritems():
if p.max_ttid and p.max_ttid < min_ttid:
p.max_ttid = None
self.replicate_dict[offset] = max_tid
self._nextPartition()
def getBackupTID(self):
outdated_set = set(self.app.pt.getOutdatedOffsetListFor(self.app.uuid))
tid = INVALID_TID
for offset, p in self.partition_dict.iteritems():
if offset not in outdated_set:
tid = min(tid, p.next_trans, p.next_obj)
if ZERO_TID != tid != INVALID_TID:
return add64(tid, -1)
return ZERO_TID
def updateBackupTID(self):
dm = self.app.dm
tid = dm.getBackupTID()
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
dm.setBackupTID(new_tid)
def populate(self):
app = self.app
pt = app.pt
uuid = app.uuid
self.partition_dict = p = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
next_tid = app.dm.getBackupTID() or last_tid
next_tid = add64(next_tid, 1) if next_tid else ZERO_TID
outdated_list = []
for offset in xrange(pt.getPartitions()):
for cell in pt.getCellList(offset):
if cell.getUUID() == uuid and not cell.isCorrupted():
self.partition_dict[offset] = p = Partition()
if cell.isOutOfDate():
outdated_list.append(offset)
try:
p.next_trans = add64(last_trans_dict[offset], 1)
except KeyError:
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID
else:
p.next_trans = p.next_obj = next_tid
p.max_ttid = None
if outdated_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=outdated_list)
def notifyPartitionChanges(self, cell_list):
"""This is a callback from MasterOperationHandler."""
abort = False
added_list = []
app = self.app
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
for offset, uuid, state in cell_list:
if uuid == app.uuid:
if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
try:
del self.partition_dict[offset]
except KeyError:
continue
self.replicate_dict.pop(offset, None)
self.source_dict.pop(offset, None)
abort = abort or self.current_partition == offset
elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition()
try:
p.next_trans = add64(last_trans_dict[offset], 1)
except KeyError:
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID
added_list.append(offset)
if added_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=added_list)
if abort:
self.abort()
def backup(self, tid, source_dict):
next_tid = None
for offset, source in source_dict.iteritems():
if source:
self.source_dict[offset] = source
self.replicate_dict[offset] = tid
elif offset != self.current_partition and \
offset not in self.replicate_dict:
# The master did its best to avoid useless replication orders
# but there may still be a few, and we may receive redundant
# update notification of backup_tid.
# So, we do nothing here if we are already replicating.
p = self.partition_dict[offset]
if not next_tid:
next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid
if next_tid:
self.updateBackupTID()
self._nextPartition()
def _nextPartition(self):
# XXX: One connection to another storage may remain open forever.
# All other previous connections are automatically closed
# after some time of inactivity.
# This should be improved in several ways:
# - Keeping connections open between 2 clusters (backup case) is
# quite a good thing because establishing a connection costs
# time/bandwidth and replication is actually never finished.
# - When all storages of a non-backup cluster are up-to-date,
# there's no reason to keep any connection open.
if self.current_partition is not None or not self.replicate_dict:
return
app = self.app
# Choose a partition with no unfinished transaction if possible.
# XXX: When leaving backup mode, we should only consider UP_TO_DATE
# cells when leaving backup mode.
for offset in self.replicate_dict:
if not self.partition_dict[offset].max_ttid:
break
try:
addr, name = self.source_dict[offset]
except KeyError:
assert app.pt.getCell(offset, app.uuid).isOutOfDate()
node = random.choice([cell.getNode()
for cell in app.pt.getCellList(offset, readable=True)
if cell.getNodeState() == NodeStates.RUNNING])
name = None
else:
node = app.nm.getByAddress(addr)
if node is None:
assert name, addr
node = app.nm.createStorage(address=addr)
self.current_partition = offset
previous_node = self.current_node
self.current_node = node
if node.isConnected(connecting=True):
if node.isIdentified():
node.getConnection().asClient()
self.fetchTransactions()
else:
assert name or node.getUUID() != app.uuid, "loopback connection"
conn = ClientConnection(app.em, StorageOperationHandler(app),
node=node, connector=app.connector_handler())
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name))
if previous_node is not None and previous_node.isConnected():
app.closeClient(previous_node.getConnection())
def connected(self, node):
if self.current_node is node and self.current_partition is not None:
self.fetchTransactions()
def fetchTransactions(self, min_tid=None):
offset = self.current_partition
p = self.partition_dict[offset]
if min_tid:
p.next_trans = min_tid
else:
try:
addr, name = self.source_dict[offset]
except KeyError:
pass
else:
if addr != self.current_node.getAddress():
return self.abort()
min_tid = p.next_trans
self.replicate_tid = self.replicate_dict.pop(offset)
logging.debug("starting replication of from %r", offset, dump(min_tid),
dump(self.replicate_tid), self.current_node)
max_tid = self.replicate_tid
tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid,
FETCH_COUNT, offset)
self.current_node.getConnection().ask(Packets.AskFetchTransactions(
offset, FETCH_COUNT, min_tid, max_tid, tid_list))
def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
offset = self.current_partition
p = self.partition_dict[offset]
max_tid = self.replicate_tid
if min_tid:
p.next_obj = min_tid
else:
min_tid = p.next_obj
p.next_trans = add64(max_tid, 1)
object_dict = {}
for serial, oid in self.app.dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid):
try:
object_dict[serial].append(oid)
except KeyError:
object_dict[serial] = [oid]
self.current_node.getConnection().ask(Packets.AskFetchObjects(
offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict))
def finish(self):
offset = self.current_partition
tid = self.replicate_tid
del self.current_partition, self.replicate_tid
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
self.updateBackupTID()
if not p.max_ttid:
p = Packets.NotifyReplicationDone(offset, tid)
self.app.master_conn.notify(p)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self._nextPartition()
def abort(self, message=''):
offset = self.current_partition
if offset is None:
return
del self.current_partition
logging.warning('replication aborted for partition %u%s',
offset, message and ' (%s)' % message)
if offset in self.partition_dict:
# XXX: Try another partition if possible, to increase probability to
# connect to another node. It would be better to explicitely
# search for another node instead.
tid = self.replicate_dict.pop(offset, None) or self.replicate_tid
if self.replicate_dict:
self._nextPartition()
self.replicate_dict[offset] = tid
else:
self.replicate_dict[offset] = tid
self._nextPartition()
else: # partition removed
self._nextPartition()
def cancel(self):
offset = self.current_partition
if offset is not None:
logging.info('cancel replication of partition %u', offset)
del self.current_partition
try:
self.replicate_dict.setdefault(offset, self.replicate_tid)
del self.replicate_tid
except AttributeError:
pass
self.getCurrentConnection().close()
def stop(self):
# Close any open connection to an upstream storage,
# possibly aborting current replication.
node = self.current_node
if node is not None is node.getUUID():
self.cancel()
# Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, (None, None))
if name:
tid = self.replicate_dict.pop(offset)
logging.info('cancel replication of partition %u from %r'
' up to %s', offset, addr, dump(tid))
# Make UP_TO_DATE cells really UP_TO_DATE
self._nextPartition()
neoppod-4fcd8ddc8caa405fd23a527f4a128644363b69c6-neo-storage/neo/storage/transactions.py 0000664 0000000 0000000 00000034452 12014530641 0027775 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from time import time
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import uuid_str, ZERO_TID
class ConflictError(Exception):
"""
Raised when a resolvable conflict occurs
Argument: tid of locking transaction or latest revision
"""
def __init__(self, tid):
Exception.__init__(self)
self._tid = tid
def getTID(self):
return self._tid
class DelayedError(Exception):
"""
Raised when an object is locked by a previous transaction
"""
class Transaction(object):
"""
Container for a pending transaction
"""
_tid = None
def __init__(self, uuid, ttid):
self._uuid = uuid
self._ttid = ttid
self._object_dict = {}
self._transaction = None
self._locked = False
self._birth = time()
self._checked_set = set()
def __repr__(self):
return "<%s(ttid=%r, tid=%r, uuid=%r, locked=%r, age=%.2fs) at 0x%x>" \
% (self.__class__.__name__,
dump(self._ttid),
dump(self._tid),
uuid_str(self._uuid),
self.isLocked(),
time() - self._birth,
id(self))
def addCheckedObject(self, oid):
assert oid not in self._object_dict, dump(oid)
self._checked_set.add(oid)
def getTTID(self):
return self._ttid
def setTID(self, tid):
assert self._tid is None, dump(self._tid)
assert tid is not None
self._tid = tid
def getTID(self):
return self._tid
def getUUID(self):
return self._uuid
def lock(self):
assert not self._locked
self._locked = True
def isLocked(self):
return self._locked
def prepare(self, oid_list, user, desc, ext, packed):
"""
Set the transaction informations
"""
# assert self._transaction is not None
self._transaction = oid_list, user, desc, ext, packed, self._ttid
def addObject(self, oid, data_id, value_serial):
"""
Add an object to the transaction
"""
assert oid not in self._checked_set, dump(oid)
self._object_dict[oid] = oid, data_id, value_serial
def delObject(self, oid):
try:
return self._object_dict.pop(oid)[1]
except KeyError:
self._checked_set.remove(oid)
def getObject(self, oid):
return self._object_dict[oid]
def getObjectList(self):
return self._object_dict.values()
def getOIDList(self):
return self._object_dict.keys()
def getLockedOIDList(self):
return self._object_dict.keys() + list(self._checked_set)
def getTransactionInformations(self):
return self._transaction
class TransactionManager(object):
"""
Manage pending transaction and locks
"""
def __init__(self, app):
self._app = app
self._transaction_dict = {}
self._store_lock_dict = {}
self._load_lock_dict = {}
self._uuid_dict = {}
def __contains__(self, ttid):
"""
Returns True if the TID is known by the manager
"""
return ttid in self._transaction_dict
def register(self, uuid, ttid):
"""
Register a transaction, it may be already registered
"""
logging.debug('Register TXN %s for %s', dump(ttid), uuid_str(uuid))
transaction = self._transaction_dict.get(ttid, None)
if transaction is None:
transaction = Transaction(uuid, ttid)
self._uuid_dict.setdefault(uuid, set()).add(transaction)
self._transaction_dict[ttid] = transaction
return transaction
def getObjectFromTransaction(self, ttid, oid):
"""
Return object data for given running transaction.
Return None if not found.
"""
try:
return self._transaction_dict[ttid].getObject(oid)
except KeyError:
return None
def reset(self):
"""
Reset the transaction manager
"""
self._transaction_dict.clear()
self._store_lock_dict.clear()
self._load_lock_dict.clear()
self._uuid_dict.clear()
def lock(self, ttid, tid, oid_list):
"""
Lock a transaction
"""
logging.debug('Lock TXN %s (ttid=%s)', dump(tid), dump(ttid))
transaction = self._transaction_dict[ttid]
# remember that the transaction has been locked
transaction.lock()
for oid in transaction.getOIDList():
self._load_lock_dict[oid] = ttid
# check every object that should be locked
uuid = transaction.getUUID()
is_assigned = self._app.pt.isAssigned
for oid in oid_list:
if is_assigned(oid, uuid) and \
self._load_lock_dict.get(oid) != ttid:
raise ValueError, 'Some locks are not held'
object_list = transaction.getObjectList()
# txn_info is None is the transaction information is not stored on
# this storage.
txn_info = transaction.getTransactionInformations()
# store data from memory to temporary table
self._app.dm.storeTransaction(tid, object_list, txn_info)
# ...and remember its definitive TID
transaction.setTID(tid)
def getTIDFromTTID(self, ttid):
return self._transaction_dict[ttid].getTID()
def unlock(self, ttid):
"""
Unlock transaction
"""
logging.debug('Unlock TXN %s', dump(ttid))
self._app.dm.finishTransaction(self.getTIDFromTTID(ttid))
self.abort(ttid, even_if_locked=True)
def storeTransaction(self, ttid, oid_list, user, desc, ext, packed):
"""
Store transaction information received from client node
"""
assert ttid in self, "Transaction not registered"
transaction = self._transaction_dict[ttid]
transaction.prepare(oid_list, user, desc, ext, packed)
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
def lockObject(self, ttid, serial, oid, unlock=False):
"""
Take a write lock on given object, checking that "serial" is
current.
Raises:
DelayedError
ConflictError
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
if locking_tid == ttid and unlock:
logging.info('Deadlock resolution on %r:%r', dump(oid), dump(ttid))
# A duplicate store means client is resolving a deadlock, so
# drop the lock it held on this object, and drop object data for
# consistency.
del self._store_lock_dict[oid]
data_id = self._transaction_dict[ttid].delObject(oid)
if data_id:
self._app.dm.pruneData((data_id,))
# Give a chance to pending events to take that lock now.
self._app.executeQueuedEvents()
# Attemp to acquire lock again.
locking_tid = self._store_lock_dict.get(oid)
if locking_tid is None:
previous_serial = None
elif locking_tid == ttid:
# If previous store was an undo, next store must be based on
# undo target.
previous_serial = self._transaction_dict[ttid].getObject(oid)[2]
if previous_serial is None:
# XXX: use some special serial when previous store was not
# an undo ? Maybe it should just not happen.
logging.info('Transaction %s storing %s more than once',
dump(ttid), dump(oid))
elif locking_tid < ttid:
# We have a bigger TTID than locking transaction, so we are younger:
# enter waiting queue so we are handled when lock gets released.
# We also want to delay (instead of conflict) if the client is
# so faster that it is committing another transaction before we
# processed UnlockInformation from the master.
logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(ttid), dump(locking_tid))
raise DelayedError
else:
# We have a smaller TTID than locking transaction, so we are older:
# this is a possible deadlock case, as we might already hold locks
# the younger transaction is waiting upon. Make client release
# locks & reacquire them by notifying it of the possible deadlock.
logging.info('Possible deadlock on %r:%r with %r',
dump(oid), dump(ttid), dump(locking_tid))
raise ConflictError(ZERO_TID)
if previous_serial is None:
history_list = self._app.dm.getObjectHistory(oid)
if history_list:
previous_serial = history_list[0][0]
if previous_serial is not None and previous_serial != serial:
logging.info('Resolvable conflict on %r:%r',
dump(oid), dump(ttid))
raise ConflictError(previous_serial)
logging.debug('Transaction %s storing %s', dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid
def checkCurrentSerial(self, ttid, serial, oid):
self.lockObject(ttid, serial, oid, unlock=True)
assert ttid in self, "Transaction not registered"
transaction = self._transaction_dict[ttid]
transaction.addCheckedObject(oid)
def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial, unlock=False):
"""
Store an object received from client node
"""
self.lockObject(ttid, serial, oid, unlock=unlock)
# store object
assert ttid in self, "Transaction not registered"
if data is None:
data_id = None
else:
data_id = self._app.dm.storeData(checksum, data, compression)
self._transaction_dict[ttid].addObject(oid, data_id, value_serial)
def abort(self, ttid, even_if_locked=False):
"""
Abort a transaction
Releases locks held on all transaction objects, deletes Transaction
instance, and executed queued events.
Note: does not alter persistent content.
"""
if ttid not in self._transaction_dict:
# the tid may be unknown as the transaction is aborted on every node
# of the partition, even if no data was received (eg. conflict on
# another node)
return
logging.debug('Abort TXN %s', dump(ttid))
transaction = self._transaction_dict[ttid]
has_load_lock = transaction.isLocked()
# if the transaction is locked, ensure we can drop it
if has_load_lock:
if not even_if_locked:
return
else:
self._app.dm.unlockData([data_id
for oid, data_id, value_serial in transaction.getObjectList()
if data_id], True)
# unlock any object
for oid in transaction.getLockedOIDList():
if has_load_lock:
lock_ttid = self._load_lock_dict.pop(oid, None)
assert lock_ttid in (ttid, None), 'Transaction %s tried to ' \
'release the lock on oid %s, but it was held by %s' % (
dump(ttid), dump(oid), dump(lock_ttid))
write_locking_tid = self._store_lock_dict.pop(oid)
assert write_locking_tid == ttid, 'Inconsistent locking state: ' \
'aborting %s:%s but %s has the lock.' % (dump(ttid), dump(oid),
dump(write_locking_tid))
# remove the transaction
uuid = transaction.getUUID()
self._uuid_dict[uuid].discard(transaction)
# clean node index if there is no more current transactions
if not self._uuid_dict[uuid]:
del self._uuid_dict[uuid]
del self._transaction_dict[ttid]
# some locks were released, some pending locks may now succeed
self._app.executeQueuedEvents()
def abortFor(self, uuid):
"""
Abort any non-locked transaction of a node
"""
logging.debug('Abort for %s', uuid_str(uuid))
# abort any non-locked transaction of this node
for ttid in [x.getTTID() for x in self._uuid_dict.get(uuid, [])]:
self.abort(ttid)
# cleanup _uuid_dict if no transaction remains for this node
transaction_set = self._uuid_dict.get(uuid)
if transaction_set is not None and not transaction_set:
del self._uuid_dict[uuid]
def isLockedTid(self, tid):
for t in self._transaction_dict.itervalues():
if t.isLocked() and t.getTID() <= tid:
return True
return False
def loadLocked(self, oid):
return oid in self._load_lock_dict
def log(self):
logging.info("Transactions:")
for txn in self._transaction_dict.values():
logging.info(' %r', txn)
logging.info(' Read locks:')
for oid, ttid in self._load_lock_dict.items():
logging.info(' %r by %r', dump(oid), dump(ttid))
logging.info(' Write locks:')
for oid, ttid in self._store_lock_dict.items():
logging.info(' %r by %r', dump(oid), dump(ttid))
def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid)
if lock_tid is not None:
transaction = self._transaction_dict[lock_tid]
if transaction.getObject(oid)[2] == orig_serial:
if new_serial:
data_id = None
else:
self._app.dm.storeData(data_id)
transaction.addObject(oid, data_id, new_serial)