Commit 01fd0972 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Make DatabaseManager abstract. Move away MySQL-specific stuff to...

Make DatabaseManager abstract. Move away MySQL-specific stuff to MySQLDatabaseManager. Fix some bugs in the protocol.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@47 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 9882a69e
......@@ -3,3 +3,4 @@ class ElectionFailure(NeoException): pass
class PrimaryFailure(NeoException): pass
class VerificationFailure(NeoException): pass
class OperationFailure(NeoException): pass
class DatabaseFailure(NeoException): pass
......@@ -370,7 +370,7 @@ class ServiceEventHandler(MasterEventHandler):
tid = app.getNextTID()
conn.addPacket(Packet().answerNewTID(packet.getId(), tid))
def handleAskNewOIDList(self, conn, packet, num_oids):
def handleAskNewOIDs(self, conn, packet, num_oids):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
......@@ -383,8 +383,8 @@ class ServiceEventHandler(MasterEventHandler):
self.handleUnexpectedPacket(conn, packet)
return
oid_list = app.getNextOIDList(num_oids)
conn.addPacket(Packet().answerNewOIDList(packet.getId(), oid_list))
oid_list = app.getNewOIDList(num_oids)
conn.addPacket(Packet().answerNewOIDs(packet.getId(), oid_list))
def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID()
......
......@@ -707,7 +707,7 @@ class Packet(object):
del cell_list[:]
except:
raise ProtocolError(self, 'invalid answer partition table')
return row_list
return ptid, row_list
decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
def _decodeSendPartitionTable(self):
......@@ -727,7 +727,7 @@ class Packet(object):
del cell_list[:]
except:
raise ProtocolError(self, 'invalid send partition table')
return row_list
return ptid, row_list
decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable
def _decodeNotifyPartitionChanges(self):
......@@ -738,7 +738,7 @@ class Packet(object):
cell_list.append(cell)
except:
raise ProtocolError(self, 'invalid notify partition changes')
return cell_list
return ptid, cell_list
decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
def _decodeStartOperation(self):
......
......@@ -39,90 +39,24 @@ class Application(object):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.dm = MySQLDatabaseManager(config.getDatabase(), config.getUser(),
config.getPassword())
self.dm = MySQLDatabaseManager(database = config.getDatabase(),
user = config.getUser(),
password = config.getPassword())
self.pt = PartitionTable(self.num_partitions, 0)
self.primary_master_node = None
if reset:
self.dropTables()
self.createTables()
self.dm.setup(reset)
self.loadConfiguration()
self.loadPartitionTable()
def dropTables(self):
"""Drop all the tables, if any."""
q = self.dm.query
q("""DROP TABLE IF EXISTS config, pt, trans, obj, ttrans, tobj""")
def createTables(self):
"""Create all the tables, if not present."""
q = self.dm.query
# The table "config" stores configuration parameters which affect the
# persistent data.
q("""CREATE TABLE IF NOT EXISTS config (
name VARBINARY(16) NOT NULL PRIMARY KEY,
value VARBINARY(255) NOT NULL
) ENGINE = InnoDB""")
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
rid INT UNSIGNED NOT NULL,
uuid BINARY(16) NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, uuid)
) ENGINE = InnoDB""")
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
tid BINARY(8) NOT NULL PRIMARY KEY,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
desc BLOB NOT NULL,
ext BLOB NOT NULL
) ENGINE = InnoDB""")
# The table "obj" stores committed object data.
q("""CREATE TABLE IF NOT EXISTS obj (
oid BINARY(8) NOT NULL,
serial BINARY(8) NOT NULL,
checksum BINARY(4) NOT NULL,
compression TINYINT UNSIGNED NOT NULL,
value MEDIUMBLOB NOT NULL,
PRIMARY KEY (oid, serial)
) ENGINE = InnoDB""")
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
tid BINARY(8) NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
desc BLOB NOT NULL,
ext BLOB NOT NULL
) ENGINE = InnoDB""")
# The table "tobj" stores uncommitted object data.
q("""CREATE TABLE IF NOT EXISTS tobj (
oid BINARY(8) NOT NULL,
serial BINARY(8) NOT NULL,
checksum BINARY(4) NOT NULL,
compression TINYINT UNSIGNED NOT NULL,
value MEDIUMBLOB NOT NULL
) ENGINE = InnoDB""")
def loadConfiguration(self):
"""Load persistent configuration data from the database.
If data is not present, generate it."""
q = self.dm.query
e = self.dm.escape
dm = self.dm
r = q("""SELECT value FROM config WHERE name = 'uuid'""")
try:
self.uuid = r[0][0]
except IndexError:
self.uuid = dm.getUUID()
if self.uuid is None:
# XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID.
while 1:
......@@ -130,36 +64,32 @@ class Application(object):
if uuid != INVALID_UUID:
break
self.uuid = uuid
q("""INSERT config VALUES ('uuid', '%s')""" % e(uuid))
r = q("""SELECT value FROM config WHERE name = 'partitions'""")
try:
if self.num_partitions != int(r[0][0]):
raise RuntimeError('partitions do not match with the database')
except IndexError:
q("""INSERT config VALUES ('partitions', '%s')""" \
% e(str(self.num_replicas)))
r = q("""SELECT value FROM config WHERE name = 'name'""")
try:
if self.name != r[0][0]:
raise RuntimeError('name does not match with the database')
except IndexError:
q("""INSERT config VALUES ('name', '%s')""" % e(self.name))
r = q("""SELECT value FROM config WHERE name = 'ptid'""")
try:
self.ptid = r[0][0]
except IndexError:
dm.setUUID(uuid)
num_partitions = dm.getNumPartitions()
if num_partitions is None:
dm.setNumPartitions(self.num_partitions)
elif num_partitions != self.num_partitions:
raise RuntimeError('partitions do not match with the database')
name = dm.getName()
if name is None:
dm.setName(self.name)
elif name != self.name:
raise RuntimeError('name does not match with the database')
ptid = dm.getPTID()
if ptid is None:
self.ptid = INVALID_PTID
q("""INSERT config VALUES ('ptid', '%s')""" % e(INVALID_PTID))
dm.setPTID(self.ptid)
else:
self.ptid = ptid
def loadPartitionTable(self):
"""Load a partition table from the database."""
nm = self.nm
pt = self.pt
r = q("""SELECT rid, uuid, state FROM pt""")
for offset, uuid, state in r:
for offset, uuid, state in self.dm.getPartitionTable():
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
......
class DatabaseManager(object):
"""This class only describes an interface for database managers."""
def __init__(self, **kwargs):
"""Initialize the object."""
pass
def setup(self, reset = 0):
"""Set up a database. If reset is true, existing data must be
discarded."""
raise NotImplementedError('this method must be overridden')
def getUUID(self):
"""Load an UUID from a database. If not present, return None."""
raise NotImplementedError('this method must be overridden')
def setUUID(self, uuid):
"""Store an UUID into a database."""
raise NotImplementedError('this method must be overridden')
def getNumPartitions(self):
"""Load the number of partitions from a database. If not present,
return None."""
raise NotImplementedError('this method must be overridden')
def setNumPartitions(self, num_partitions):
"""Store the number of partitions into a database."""
raise NotImplementedError('this method must be overridden')
def getName(self):
"""Load a name from a database. If not present, return None."""
raise NotImplementedError('this method must be overridden')
def setName(self, name):
"""Store a name into a database."""
raise NotImplementedError('this method must be overridden')
def getPTID(self):
"""Load a Partition Table ID from a database. If not present,
return None."""
raise NotImplementedError('this method must be overridden')
def setPTID(self, ptid):
"""Store a Partition Table ID into a database."""
raise NotImplementedError('this method must be overridden')
def getPartitionTable(self):
"""Return a whole partition table as a tuple of rows. Each row
is again a tuple of an offset (row ID), an UUID of a storage
node, and a cell state."""
raise NotImplementedError('this method must be overridden')
def getLastOID(self, all = True):
"""Return the last OID in a database. If all is true,
unfinished transactions must be taken account into. If there
is no OID in the database, return None."""
raise NotImplementedError('this method must be overridden')
def getLastTID(self, all = True):
"""Return the last TID in a database. If all is true,
unfinished transactions must be taken account into. If there
is no TID in the database, return None."""
raise NotImplementedError('this method must be overridden')
def getUnfinishedTIDList(self):
"""Return a list of unfinished transaction's IDs."""
raise NotImplementedError('this method must be overridden')
def getOIDListByTID(self, tid, all = False):
"""Return a list of the IDs of objects belonging to a given
transaction. If such a transaction does not exist, return
None rather than an empty list. If all is true, the data must
be searched from unfinished transactions as well."""
raise NotImplementedError('this method must be overridden')
def objectPresent(self, oid, tid, all = True):
"""Return true iff an object specified by a given pair of an
object ID and a transaction ID is present in a database.
Otherwise, return false. If all is true, the object must be
search from unfinished transactions as well."""
raise NotImplementedError('this method must be overridden')
def changePartitionTable(self, ptid, cell_list):
"""Change a part of a partition table. The list of cells is
a tuple of tuples, each of which consists of an offset (row ID),
an UUID of a storage node, and a cell state. The Partition
Table ID must be stored as well."""
raise NotImplementedError('this method must be overridden')
def setPartitionTable(self, ptid, cell_list):
"""Set a whole partition table. The semantics is the same as
changePartitionTable, except that existing data must be
thrown away."""
raise NotImplementedError('this method must be overridden')
......@@ -3,17 +3,21 @@ from MySQLdb import OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
import logging
class DatabaseManager(object):
"""This class manages a database.
from neo.storage.database import DatabaseManager
from neo.exception import DatabaseFailure
from neo.util import dump
from neo.protocol import DISCARDED_STATE
For now, this implementation is limited to MySQL."""
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
def __init__(self, database, user, password = None):
self.db = database
self.user = user
self.passwd = password
def __init__(self, **kwargs):
self.db = kwargs['database']
self.user = kwargs['user']
self.passwd = kwargs.get('password')
self.conn = None
self.connect()
super(MySQLDatabaseManager, self).__init__(**kwargs)
def connect(self):
kwd = {'db' : self.db, 'user' : self.user}
......@@ -22,6 +26,16 @@ class DatabaseManager(object):
logging.info('connecting to MySQL on the database %s with user %s',
self.db, self.user)
self.conn = MySQLdb.connect(**kwd)
self.conn.autocommit(False)
def begin(self):
self.query("""BEGIN""")
def commit(self):
self.conn.commit()
def rollback(self):
self.conn.rollback()
def query(self, query):
"""Query data from a database."""
......@@ -37,9 +51,247 @@ class DatabaseManager(object):
logging.info('the MySQL server is gone; reconnecting')
self.connect()
return self.query(query)
raise
raise DatabaseFailure('MySQL error %d: %s' % (m[0], m[1]))
return r
def escape(self, s):
"""Escape special characters in a string."""
return self.conn.escape_string(s)
def setup(self, reset = 0):
q = self.query
if reset:
q("""DROP TABLE IF EXISTS config, pt, trans, obj, ttrans, tobj""")
# The table "config" stores configuration parameters which affect the
# persistent data.
q("""CREATE TABLE IF NOT EXISTS config (
name VARBINARY(16) NOT NULL PRIMARY KEY,
value VARBINARY(255) NOT NULL
) ENGINE = InnoDB""")
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
rid INT UNSIGNED NOT NULL,
uuid BINARY(16) NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, uuid)
) ENGINE = InnoDB""")
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
tid BINARY(8) NOT NULL PRIMARY KEY,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
desc BLOB NOT NULL,
ext BLOB NOT NULL
) ENGINE = InnoDB""")
# The table "obj" stores committed object data.
q("""CREATE TABLE IF NOT EXISTS obj (
oid BINARY(8) NOT NULL,
serial BINARY(8) NOT NULL,
checksum BINARY(4) NOT NULL,
compression TINYINT UNSIGNED NOT NULL,
value MEDIUMBLOB NOT NULL,
PRIMARY KEY (oid, serial)
) ENGINE = InnoDB""")
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
tid BINARY(8) NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
desc BLOB NOT NULL,
ext BLOB NOT NULL
) ENGINE = InnoDB""")
# The table "tobj" stores uncommitted object data.
q("""CREATE TABLE IF NOT EXISTS tobj (
oid BINARY(8) NOT NULL,
serial BINARY(8) NOT NULL,
checksum BINARY(4) NOT NULL,
compression TINYINT UNSIGNED NOT NULL,
value MEDIUMBLOB NOT NULL
) ENGINE = InnoDB""")
def getConfiguration(self, key):
q = self.query
e = self.escape
key = e(str(key))
r = q("""SELECT value FROM config WHERE name = '%s'""" % key)
try:
return r[0][0]
except IndexError:
return None
def setConfiguration(self, key, value):
q = self.query
e = self.escape
key = e(str(key))
value = e(str(value))
q("""INSERT config VALUES ('%s', '%s')""" % (key, value))
def getUUID(self):
return self.getConfiguration('uuid')
def setUUID(self, uuid):
self.begin()
try:
self.setConfiguration('uuid', uuid)
except:
self.rollback()
raise
self.commit()
def getNumPartitions(self):
n = self.getConfiguration('partitions')
if n is not None:
return int(n)
def setNumPartitions(self, num_partitions):
self.begin()
try:
self.setConfiguration('partitions', num_partitions)
except:
self.rollback()
raise
self.commit()
def getName(self):
return self.getConfiguration('name')
def setName(self, name):
self.begin()
try:
self.setConfiguration('name', name)
except:
self.rollback()
raise
self.commit()
def getPTID(self):
return self.getConfiguration('ptid')
def setPTID(self, ptid):
self.begin()
try:
self.setConfiguration('ptid', ptid)
except:
self.rollback()
raise
self.commit()
def getPartitionTable(self):
q = self.query
return q("""SELECT rid, uuid, state FROM pt""")
def getLastOID(self, all = True):
q = self.query
self.begin()
loid = q("""SELECT MAX(oid) FROM obj""")[0][0]
if all:
tmp_loid = q("""SELECT MAX(oid) FROM tobj""")[0][0]
if loid is None or (tmp_loid is not None and loid < tmp_loid):
loid = tmp_loid
self.commit()
return loid
def getLastTID(self, all = True):
# XXX this does not consider serials in obj.
# I am not sure if this is really harmful. For safety,
# check for tobj only at the moment. The reason why obj is
# not tested is that it is too slow to get the max serial
# from obj when it has a huge number of objects, because
# serial is the second part of the primary key, so the index
# is not used in this case. If doing it, it is better to
# make another index for serial, but I doubt the cost increase
# is worth.
q = self.query
self.begin()
ltid = q("""SELECT MAX(tid) FROM trans""")[0][0]
if all:
tmp_ltid = q("""SELECT MAX(tid) FROM ttrans""")[0][0]
if ltid is None or (tmp_ltid is not None and ltid < tmp_ltid):
ltid = tmp_ltid
tmp_serial = q("""SELECT MAX(serial) FROM tobj""")[0][0]
if ltid is None or (tmp_serial is not None and ltid < tmp_serial):
ltid = tmp_serial
self.commit()
return ltid
def getUnfinishedTIDList(self):
q = self.query
tid_set = set()
self.begin()
r = q("""SELECT tid FROM ttrans""")
tid_set.add((t[0] for t in r))
r = q("""SELECT serial FROM tobj""")
self.commit()
tid_set.add((t[0] for t in r))
return list(tid_set)
def getOIDListByTID(self, tid, all = False):
q = self.query
e = self.escape
tid = e(tid)
self.begin()
r = q("""SELECT oids FROM trans WHERE tid = '%s'""" % tid)
if not r and all:
r = q("""SELECT oids FROM ttrans WHERE tid = '%s'""" % tid)
self.commit()
if r:
oids = r[0][0]
if (len(oids) % 8) != 0 or len(oids) == 0:
raise DatabaseFailure('invalid oids for tid %s' % dump(tid))
oid_list = []
for i in xrange(0, len(oids), 8):
oid_list.append(oids[i:i+8])
return oid_list
return None
def objectPresent(self, oid, tid, all = True):
q = self.query
e = self.escape
oid = e(oid)
tid = e(tid)
self.begin()
r = q("""SELECT oid FROM obj WHERE oid = '%s' AND serial = '%s'""" \
% (oid, tid))
if not r and all:
r = q("""SELECT oid FROM tobj WHERE oid = '%s' AND serial = '%s'""" \
% (oid, tid))
self.commit()
if r:
return True
return False
def doSetPartitionTable(self, ptid, cell_list, reset):
q = self.query
e = self.escape
self.begin()
try:
if reset:
q("""TRUNCATE pt""")
for offset, uuid, state in cell_list:
uuid = e(uuid)
if state == DISCARDED_STATE:
q("""DELETE FROM pt WHERE offset = %d AND uuid = '%s'""" \
% (offset, uuid))
else:
q("""INSERT INTO pt VALUES (%d, '%s', %d)
ON DUPLICATE KEY UPDATE state = %d""" \
% (offset, uuid, state, state))
ptid = e(ptid)
q("""UPDATE config SET value = '%s' WHERE name = 'ptid'""" % ptid)
except:
self.rollback()
raise
self.commit()
def changePartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, True)
def setPartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, False)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment