Commit d228eacd authored by Kirill Smelkov's avatar Kirill Smelkov

Merge remote-tracking branch 'origin/master' into t

* origin/master:
  client: kill .supportsTransactionalUndo()
  client: for read accesses, pick a random good node, connected or not
  storage: optimize storage layout of raw data for replication
  sqlite: remove useless AUTOINCREMENT for data.id (reuse of deleted ids is fine)
  storage: speed up reads by indexing 'obj' primarily by 'oid' (instead of 'tid')
  storage: pass schema of tables to migration methods
  storage: update backend version between each migration step
parents fa60a7c1 f95f336a
......@@ -136,9 +136,6 @@ class Storage(BaseStorage.BaseStorage,
def supportsUndo(self):
return True
def supportsTransactionalUndo(self):
return True
def loadEx(self, oid, version):
try:
data, serial, _ = self.app.load(oid)
......
......@@ -56,25 +56,28 @@ class ConnectionPool(object):
logging.info('%r not ready', node)
else:
logging.info('Connected %r', node)
# Make sure this node will be considered for the next reads
# even if there was a previous recent failure.
self.node_failure_dict.pop(node.getUUID(), None)
return conn
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell, random=random.random):
# The use of 'random' suffles cells to randomise node to access.
uuid = cell.getUUID()
# First, prefer a connected node.
if uuid in self.connection_dict:
return random()
# Then one that didn't fail recently.
failure = self.node_failure_dict.get(uuid)
# Prefer a node that didn't fail recently.
failure = self.node_failure_dict.get(cell.getUUID())
if failure:
if time.time() < failure:
# At last, order by date of connection failure.
# Or order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from node_failure_dict.
self.node_failure_dict.pop(uuid, None)
return 1 + random()
self.node_failure_dict.pop(cell.getUUID(), None)
# A random one, connected or not, is a trivial and quite efficient way
# to distribute the load evenly. On write accesses, a client connects
# to all nodes of touched cells, but before that, or if a client is
# specialized to only do read-only accesses, it should not limit
# itself to only use the first connected nodes.
return random()
def getConnForNode(self, node):
"""Return a locked connection object to a given node
......
......@@ -406,7 +406,7 @@ class ImporterDatabaseManager(DatabaseManager):
if compression:
data = compressed_data
checksum = util.makeChecksum(data)
data_id = self.holdData(util.makeChecksum(data), data,
data_id = self.holdData(util.makeChecksum(data), oid, data,
compression)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
......
......@@ -161,11 +161,14 @@ class DatabaseManager(object):
"The database can not be upgraded because you have unfinished"
" transactions. Use an older version of NEO to verify them.")
def _getVersion(self):
def migrate(self, *args, **kw):
version = int(self.getConfiguration("version") or 0)
if self.VERSION < version:
raise DatabaseFailure("The database can not be downgraded.")
return version
while version < self.VERSION:
version += 1
getattr(self, '_migrate%s' % version)(*args, **kw)
self.setConfiguration("version", version)
def doOperation(self, app):
pass
......@@ -485,7 +488,11 @@ class DatabaseManager(object):
existing data is first thrown away.
"""
@requires(_changePartitionTable)
def _getDataLastId(self, partition):
"""
"""
@requires(_changePartitionTable, _getDataLastId)
def changePartitionTable(self, ptid, cell_list, reset=False):
readable_set = self._readable_set
if reset:
......@@ -500,6 +507,10 @@ class DatabaseManager(object):
raise NonReadableCell
self._getPartition = _getPartition
self._getReadablePartition = _getReadablePartition
d = self._data_last_ids = []
for p in xrange(np):
i = self._getDataLastId(p)
d.append(p << 48 if i is None else i + 1)
me = self.getUUID()
for offset, nid, state in cell_list:
if nid == me:
......@@ -567,7 +578,7 @@ class DatabaseManager(object):
"""
@abstract
def storeData(self, checksum, data, compression):
def storeData(self, checksum, oid, data, compression):
"""To be overridden by the backend to store object raw data
If same data was already stored, the storage only has to check there's
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from binascii import a2b_hex
from collections import OrderedDict
import MySQLdb
from MySQLdb import DataError, IntegrityError, \
OperationalError, ProgrammingError
......@@ -49,7 +50,7 @@ def getPrintableQuery(query, max=70):
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
VERSION = 1
VERSION = 2
ENGINES = "InnoDB", "RocksDB", "TokuDB"
_engine = ENGINES[0] # default engine
......@@ -176,41 +177,48 @@ class MySQLDatabaseManager(DatabaseManager):
if e.args[0] != NO_SUCH_TABLE:
raise
def _migrate1(self, _):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict):
q = self.query
if self.nonempty('obj') is None:
if self.nonempty('new_obj') is None:
return
else:
q("DROP TABLE IF EXISTS new_obj")
q(schema_dict.pop('obj') % 'new_obj' + " SELECT * FROM obj")
q("DROP TABLE obj")
q("ALTER TABLE new_obj RENAME TO obj")
def _setup(self, dedup=False):
self._config.clear()
q = self.query
p = engine = self._engine
schema_dict = OrderedDict()
if self.nonempty("config") is None:
# The table "config" stores configuration
# parameters which affect the persistent data.
q("""CREATE TABLE config (
name VARBINARY(255) NOT NULL PRIMARY KEY,
value VARBINARY(255) NULL
) ENGINE=""" + engine)
else:
# Automatic migration.
version = self._getVersion()
if version < 1:
self._checkNoUnfinishedTransactions()
q("DROP TABLE IF EXISTS ttrans")
self._setConfiguration("version", self.VERSION)
# The table "config" stores configuration
# parameters which affect the persistent data.
schema_dict['config'] = """CREATE TABLE %s (
name VARBINARY(255) NOT NULL PRIMARY KEY,
value VARBINARY(255) NULL
) ENGINE=""" + engine
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
schema_dict['pt'] = """CREATE TABLE %s (
rid INT UNSIGNED NOT NULL,
nid INT NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, nid)
) ENGINE=""" + engine)
) ENGINE=""" + engine
if self._use_partition:
p += """ PARTITION BY LIST (`partition`) (
PARTITION dummy VALUES IN (NULL))"""
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
schema_dict['trans'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
packed BOOLEAN NOT NULL,
......@@ -220,19 +228,19 @@ class MySQLDatabaseManager(DatabaseManager):
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`partition`, tid)
) ENGINE=""" + p)
) ENGINE=""" + p
# The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj (
schema_dict['obj'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (`partition`, tid, oid),
KEY (`partition`, oid, tid),
PRIMARY KEY (`partition`, oid, tid),
KEY tid (`partition`, tid, oid),
KEY (data_id)
) ENGINE=""" + p)
) ENGINE=""" + p
if engine == "TokuDB":
engine += " compression='tokudb_uncompressed'"
......@@ -240,21 +248,21 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "data" stores object data.
# We'd like to have partial index on 'hash' column (e.g. hash(4))
# but 'UNIQUE' constraint would not work as expected.
q("""CREATE TABLE IF NOT EXISTS data (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
schema_dict['data'] = """CREATE TABLE %%s (
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL,
value MEDIUMBLOB NOT NULL%s
) ENGINE=%s""" % (""",
UNIQUE (hash, compression)""" if dedup else "", engine))
UNIQUE (hash, compression)""" if dedup else "", engine)
q("""CREATE TABLE IF NOT EXISTS bigdata (
schema_dict['bigdata'] = """CREATE TABLE %s (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
value MEDIUMBLOB NOT NULL
) ENGINE=""" + engine)
) ENGINE=""" + engine
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
schema_dict['ttrans'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED,
packed BOOLEAN NOT NULL,
......@@ -263,17 +271,26 @@ class MySQLDatabaseManager(DatabaseManager):
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL
) ENGINE=""" + engine)
) ENGINE=""" + engine
# The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj (
schema_dict['tobj'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (tid, oid)
) ENGINE=""" + engine)
) ENGINE=""" + engine
if self.nonempty('config') is None:
q(schema_dict.pop('config') % 'config')
self._setConfiguration('version', self.VERSION)
else:
self.migrate(schema_dict)
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
self._uncommitted_data.update(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
......@@ -335,18 +352,21 @@ class MySQLDatabaseManager(DatabaseManager):
offset_list = self._getAssignedPartitionList()
p64 = util.p64
q = self.query
sql = ("SELECT MAX(tid) FROM %s FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s")
sql = "SELECT MAX(tid) FROM %s WHERE `partition`=%s"
trans, obj = ({partition: p64(tid)
for partition in offset_list
for tid, in q(sql % (t, partition))
if tid is not None}
for t in ('trans', 'obj'))
for t in ('trans FORCE INDEX (PRIMARY)', 'obj FORCE INDEX (tid)'))
oid = self._sqlmax(
"SELECT MAX(oid) FROM obj FORCE INDEX (`partition`)"
"SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s", offset_list)
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48))[0][0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
......@@ -363,7 +383,7 @@ class MySQLDatabaseManager(DatabaseManager):
def getLastObjectTID(self, oid):
oid = util.u64(oid)
r = self.query("SELECT tid FROM obj FORCE INDEX(`partition`)"
r = self.query("SELECT tid FROM obj FORCE INDEX(PRIMARY)"
" WHERE `partition`=%d AND oid=%d"
" ORDER BY tid DESC LIMIT 1"
% (self._getReadablePartition(oid), oid))
......@@ -371,7 +391,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _getNextTID(self, *args): # partition, oid, tid
r = self.query("SELECT tid FROM obj"
" FORCE INDEX(`partition`)"
" FORCE INDEX(PRIMARY)"
" WHERE `partition`=%d AND oid=%d AND tid>%d"
" ORDER BY tid LIMIT 1" % args)
return r[0][0] if r else None
......@@ -380,7 +400,7 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
partition = self._getReadablePartition(oid)
sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(`partition`)'
' FROM obj FORCE INDEX(PRIMARY)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d') % (partition, oid)
if before_tid is not None:
......@@ -437,9 +457,8 @@ class MySQLDatabaseManager(DatabaseManager):
for partition in offset_list:
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(PRIMARY)"
+ where)
if x]
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL" % where)]
if not self._use_partition:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
......@@ -455,7 +474,8 @@ class MySQLDatabaseManager(DatabaseManager):
raise
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
......@@ -491,7 +511,9 @@ class MySQLDatabaseManager(DatabaseManager):
else:
value_serial = 'NULL'
value = "(%s,%s,%s,%s,%s)," % (
partition, oid, tid, data_id or 'NULL', value_serial)
partition, oid, tid,
'NULL' if data_id is None else data_id,
value_serial)
values_size += len(value)
# actually: max_values < values_size + EXTRA - len(final comma)
# (test_max_allowed_packet checks that EXTRA == 2)
......@@ -550,7 +572,7 @@ class MySQLDatabaseManager(DatabaseManager):
for i in xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23)))
def storeData(self, checksum, data, compression, _pack=_structLL.pack):
def storeData(self, checksum, oid, data, compression, _pack=_structLL.pack):
e = self.escape
checksum = e(checksum)
if 0x1000000 <= len(data): # 16M (MEDIUMBLOB limit)
......@@ -577,9 +599,11 @@ class MySQLDatabaseManager(DatabaseManager):
i = bigdata_id = self.conn.insert_id()
i += 1
data = _pack(bigdata_id, length)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" %
(checksum, compression, e(data)))
self.query("INSERT INTO data VALUES (%s, '%s', %d, '%s')" %
(r, checksum, compression, e(data)))
except IntegrityError as e:
if e.args[0] == DUP_ENTRY:
(r, d), = self.query("SELECT id, value FROM data"
......@@ -588,7 +612,8 @@ class MySQLDatabaseManager(DatabaseManager):
if d == data:
return r
raise
return self.conn.insert_id()
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
compression, hash, value = self.query(
......@@ -602,7 +627,7 @@ class MySQLDatabaseManager(DatabaseManager):
del _structLL
def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT tid, value_tid FROM obj FORCE INDEX(`partition`)'
sql = ('SELECT tid, value_tid FROM obj FORCE INDEX(PRIMARY)'
' WHERE `partition` = %d AND oid = %d'
) % (self._getReadablePartition(oid), oid)
if tid is not None:
......@@ -627,7 +652,8 @@ class MySQLDatabaseManager(DatabaseManager):
u64 = util.u64
tid = u64(tid)
sql = " FROM tobj WHERE tid=%d" % u64(ttid)
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql)]
q("INSERT INTO obj SELECT `partition`, oid, %d, data_id, value_tid %s"
% (tid, sql))
q("DELETE" + sql)
......@@ -654,7 +680,8 @@ class MySQLDatabaseManager(DatabaseManager):
if serial:
sql += ' AND tid=%d' % u64(serial)
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
......@@ -667,7 +694,8 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
......@@ -693,7 +721,7 @@ class MySQLDatabaseManager(DatabaseManager):
p64 = util.p64
r = self.query("SELECT tid, IF(compression < 128, LENGTH(value),"
" CAST(CONV(HEX(SUBSTR(value, 5, 4)), 16, 10) AS INT))"
" FROM obj FORCE INDEX(`partition`)"
" FROM obj FORCE INDEX(PRIMARY)"
" LEFT JOIN data ON (obj.data_id = data.id)"
" WHERE `partition` = %d AND oid = %d AND tid >= %d"
" ORDER BY tid DESC LIMIT %d, %d" %
......@@ -722,7 +750,7 @@ class MySQLDatabaseManager(DatabaseManager):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
r = self.query('SELECT tid, oid FROM obj FORCE INDEX(PRIMARY)'
r = self.query('SELECT tid, oid FROM obj FORCE INDEX(tid)'
' WHERE `partition` = %d AND tid <= %d'
' AND (tid = %d AND %d <= oid OR %d < tid)'
' ORDER BY tid ASC, oid ASC LIMIT %d' % (
......@@ -787,7 +815,7 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid, MAX(tid)"
" FROM obj FORCE INDEX(`partition`)"
" FROM obj FORCE INDEX(PRIMARY)"
" WHERE tid <= %d GROUP BY oid"
% tid):
partition = getPartition(oid)
......@@ -838,7 +866,7 @@ class MySQLDatabaseManager(DatabaseManager):
# last grouped value, instead of the greatest one.
r = self.query(
"""SELECT tid, oid
FROM obj FORCE INDEX(PRIMARY)
FROM obj FORCE INDEX(tid)
WHERE `partition` = %(partition)s
AND tid <= %(max_tid)d
AND (tid > %(min_tid)d OR
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict
import os
import sqlite3
from hashlib import sha1
......@@ -67,7 +68,7 @@ class SQLiteDatabaseManager(DatabaseManager):
never be used for small requests.
"""
VERSION = 1
VERSION = 2
def _parse(self, database):
self.db = os.path.expanduser(database)
......@@ -112,39 +113,51 @@ class SQLiteDatabaseManager(DatabaseManager):
if not e.args[0].startswith("no such table:"):
raise
def _migrate1(self, *_):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict, index_dict):
# BBB: As explained in _setup, no transactional DDL
# so let's do the same dance as for MySQL.
q = self.query
if self.nonempty('obj') is None:
if self.nonempty('new_obj') is None:
return
else:
q("DROP TABLE IF EXISTS new_obj")
q(schema_dict.pop('obj') % 'new_obj')
q("INSERT INTO new_obj SELECT * FROM obj")
q("DROP TABLE obj")
q("ALTER TABLE new_obj RENAME TO obj")
def _setup(self, dedup=False):
# SQLite does support transactional Data Definition Language statements
# but unfortunately, the built-in Python binding automatically commits
# between such statements. This anti-feature causes this method to be
# relatively slow; unit tests enables the UNSAFE boolean flag.
# BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements.
# This anti-feature causes this method to be relatively slow.
# Unit tests enables the UNSAFE boolean flag.
self._config.clear()
q = self.query
schema_dict = OrderedDict()
index_dict = {}
if self.nonempty("config") is None:
# The table "config" stores configuration
# parameters which affect the persistent data.
q("CREATE TABLE IF NOT EXISTS config ("
" name TEXT NOT NULL PRIMARY KEY,"
" value TEXT)")
else:
# Automatic migration.
version = self._getVersion()
if version < 1:
self._checkNoUnfinishedTransactions()
q("DROP TABLE IF EXISTS ttrans")
self._setConfiguration("version", self.VERSION)
# The table "config" stores configuration
# parameters which affect the persistent data.
schema_dict['config'] = """CREATE TABLE %s (
name TEXT NOT NULL PRIMARY KEY,
value TEXT)
"""
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
schema_dict['pt'] = """CREATE TABLE %s (
rid INTEGER NOT NULL,
nid INTEGER NOT NULL,
state INTEGER NOT NULL,
PRIMARY KEY (rid, nid))
""")
"""
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
schema_dict['trans'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
tid INTEGER NOT NULL,
packed BOOLEAN NOT NULL,
......@@ -154,38 +167,34 @@ class SQLiteDatabaseManager(DatabaseManager):
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid))
""")
"""
# The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj (
schema_dict['obj'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (partition, tid, oid))
""")
q("""CREATE INDEX IF NOT EXISTS _obj_i1 ON
obj(partition, oid, tid)
""")
q("""CREATE INDEX IF NOT EXISTS _obj_i2 ON
obj(data_id)
""")
PRIMARY KEY (partition, oid, tid))
"""
index_dict['obj'] = (
"CREATE INDEX %s ON %s(partition, tid, oid)",
"CREATE INDEX %s ON %s(data_id)")
# The table "data" stores object data.
q("""CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schema_dict['data'] = """CREATE TABLE %s (
id INTEGER PRIMARY KEY,
hash BLOB NOT NULL,
compression INTEGER NOT NULL,
value BLOB NOT NULL)
""")
"""
if dedup:
q("""CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON
data(hash, compression)
""")
index_dict['data'] = (
"CREATE UNIQUE INDEX %s ON %s(hash, compression)",)
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
schema_dict['ttrans'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
tid INTEGER,
packed BOOLEAN NOT NULL,
......@@ -194,17 +203,28 @@ class SQLiteDatabaseManager(DatabaseManager):
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL)
""")
"""
# The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj (
schema_dict['tobj'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (tid, oid))
""")
"""
if self.nonempty('config') is None:
q(schema_dict.pop('config') % 'config')
self._setConfiguration('version', self.VERSION)
else:
self.migrate(schema_dict, index_dict)
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
for i, index in enumerate(index_dict.get(table, ()), 1):
q(index % ('IF NOT EXISTS _%s_i%s' % (table, i), table))
self._uncommitted_data.update(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
......@@ -266,6 +286,10 @@ class SQLiteDatabaseManager(DatabaseManager):
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48)).fetchone()[0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
......@@ -338,14 +362,16 @@ class SQLiteDatabaseManager(DatabaseManager):
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj" + where, args) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
% where, args)]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
......@@ -408,12 +434,14 @@ class SQLiteDatabaseManager(DatabaseManager):
return len(data_id_list)
return 0
def storeData(self, checksum, data, compression,
def storeData(self, checksum, oid, data, compression,
_dup=unique_constraint_message("data", "hash", "compression")):
H = buffer(checksum)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
return self.query("INSERT INTO data VALUES (NULL,?,?,?)",
(H, compression, buffer(data))).lastrowid
self.query("INSERT INTO data VALUES (?,?,?,?)",
(r, H, compression, buffer(data)))
except sqlite3.IntegrityError, e:
if e.args[0] == _dup:
(r, d), = self.query("SELECT id, value FROM data"
......@@ -422,10 +450,12 @@ class SQLiteDatabaseManager(DatabaseManager):
if str(d) == data:
return r
raise
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data where id=?", (data_id,)).fetchone()
" FROM data WHERE id=?", (data_id,)).fetchone()
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getReadablePartition(oid)
......@@ -454,7 +484,8 @@ class SQLiteDatabaseManager(DatabaseManager):
tid = u64(tid)
ttid = u64(ttid)
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, (ttid,)) if x]
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql, (ttid,))]
q("INSERT INTO obj SELECT partition, oid, ?, data_id, value_tid" + sql,
(tid, ttid))
q("DELETE" + sql, (ttid,))
......@@ -481,8 +512,8 @@ class SQLiteDatabaseManager(DatabaseManager):
sql += " AND tid=?"
args.append(util.u64(serial))
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
......@@ -498,8 +529,8 @@ class SQLiteDatabaseManager(DatabaseManager):
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
......
......@@ -113,7 +113,7 @@ class StorageOperationHandler(EventHandler):
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, data, compression)
data_id = dm.storeData(checksum, oid, data, compression)
else:
data_id = None
# Directly store the transaction.
......
......@@ -470,7 +470,7 @@ class TransactionManager(EventQueue):
if data is None:
data_id = None
else:
data_id = self._app.dm.holdData(checksum, data, compression)
data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial)
def rebaseObject(self, ttid, oid):
......
......@@ -104,6 +104,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_getPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE)
......@@ -124,7 +125,7 @@ class StorageDBTests(NeoUnitTestBase):
self._last_ttid = ttid = add64(self._last_ttid, 1)
transaction = oid_list, 'user', 'desc', 'ext', False, ttid
H = "0" * 20
object_list = [(oid, self.db.holdData(H, '', 1), None)
object_list = [(oid, self.db.holdData(H, oid, '', 1), None)
for oid in oid_list]
return (transaction, object_list)
......@@ -203,6 +204,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_setPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
......@@ -452,8 +454,8 @@ class StorageDBTests(NeoUnitTestBase):
tid4 = self.getNextTID()
tid5 = self.getNextTID()
oid1 = p64(1)
foo = db.holdData("3" * 20, 'foo', 0)
bar = db.holdData("4" * 20, 'bar', 0)
foo = db.holdData("3" * 20, oid1, 'foo', 0)
bar = db.holdData("4" * 20, oid1, 'bar', 0)
db.releaseData((foo, bar))
db.storeTransaction(
tid1, (
......
......@@ -19,6 +19,7 @@ from MySQLdb import NotSupportedError, OperationalError
from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE
from ..mock import Mock
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_OID
from neo.lib.util import p64
from .. import DB_PREFIX, DB_SOCKET, DB_USER
from .testStorageDBTests import StorageDBTests
......@@ -114,7 +115,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.assertEqual(2, max(len(self.db.escape(chr(x)))
for x in xrange(256)))
self.assertEqual(2, len(self.db.escape('\0')))
self.db.storeData('\0' * 20, '\0' * (2**24-1), 0)
self.db.storeData('\0' * 20, ZERO_OID, '\0' * (2**24-1), 0)
size, = query_list
max_allowed = self.db.__class__._max_allowed_packet
self.assertTrue(max_allowed - 1024 < size <= max_allowed, size)
......@@ -123,7 +124,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.db._max_allowed_packet = max_allowed_packet
del query_list[:]
self.db.storeTransaction(p64(0),
((p64(1<<i),0,None) for i in xrange(10)), None)
((p64(1<<i),1234,None) for i in xrange(10)), None)
self.assertEqual(max(query_list), max_allowed_packet)
self.assertEqual(len(query_list), count)
......
......@@ -57,7 +57,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager.storeObject(locking_serial, ram_serial, oid, 0, "3" * 20,
'bar', None)
holdData = self.app.dm.mockGetNamedCalls('holdData')
self.assertEqual(holdData.pop(0).params, ("3" * 20, 'bar', 0))
self.assertEqual(holdData.pop(0).params, ("3" * 20, oid, 'bar', 0))
orig_object = self.manager.getObjectFromTransaction(locking_serial,
oid)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
......
......@@ -480,17 +480,18 @@ class Test(NEOThreadedTest):
def test_notifyNodeInformation(self, cluster):
# translated from MasterNotificationsHandlerTests
# (neo.tests.client.testMasterHandler)
good = [1, 0].pop
if 1:
cluster.db # open DB
s0, s1 = cluster.client.nm.getStorageList()
conn = s0.getConnection()
self.assertFalse(conn.isClosed())
getCellSortKey = cluster.client.cp.getCellSortKey
self.assertEqual(getCellSortKey(s0, int), 0)
self.assertEqual(getCellSortKey(s0, good), 0)
cluster.neoctl.dropNode(s0.getUUID())
self.assertEqual([s1], cluster.client.nm.getStorageList())
self.assertTrue(conn.isClosed())
self.assertEqual(getCellSortKey(s0, int), 1)
self.assertEqual(getCellSortKey(s0, good), 1)
# XXX: the test originally checked that 'unregister' method
# was called (even if it's useless in this case),
# but we would need an API to do that easily.
......@@ -1439,7 +1440,7 @@ class Test(NEOThreadedTest):
bad = []
ok = []
def data_args(value):
return makeChecksum(value), value, 0
return makeChecksum(value), ZERO_OID, value, 0
node_list = []
for i, s in enumerate(cluster.storage_list):
node_list.append(s.uuid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment