Commit 5d042452 authored by Julien Muchembled's avatar Julien Muchembled

MySQL: change schema of 'data' table to use autoincrement integer as primary key

This optimizes storage layout on disk, because more recent entries are often
being accessed more.

This will also simplify implementation of incremental backups.

Storage API is changed so that backends are not forced to use the checksum to
index data.
parent d4fac5c0
NEO 1.0
=======
The format of MySQL tables has changed in NEO 1.0 and there is no backward
compatibility or transparent migration, so you will have to use the following
SQL commands to migrate each storage from NEO 0.10.x::
-- make sure 'tobj' is empty first
CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial;
DROP TABLE data;
RENAME TABLE new_data TO data;
CREATE TABLE new_obj (partition SMALLINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL, serial BIGINT UNSIGNED NOT NULL, data_id BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL, PRIMARY KEY (partition, oid, serial), KEY (data_id)) ENGINE = InnoDB SELECT partition, oid, serial, data.id as data_id, value_serial FROM obj LEFT JOIN data ON (obj.hash=data.hash);
DROP TABLE obj;
RENAME TABLE new_obj TO obj;
ALTER TABLE tobj CHANGE hash data_id BIGINT UNSIGNED NULL;
NEO 0.10 NEO 0.10
======== ========
......
...@@ -347,6 +347,7 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -347,6 +347,7 @@ class BTreeDatabaseManager(DatabaseManager):
raise AssertionError("hash collision") raise AssertionError("hash collision")
except KeyError: except KeyError:
self._data[checksum] = compression, data, set() self._data[checksum] = compression, data, set()
return checksum
def finishTransaction(self, tid): def finishTransaction(self, tid):
tid = util.u64(tid) tid = util.u64(tid)
......
...@@ -68,7 +68,7 @@ class DatabaseManager(object): ...@@ -68,7 +68,7 @@ class DatabaseManager(object):
_uncommitted_data is a dict containing refcounts to data of _uncommitted_data is a dict containing refcounts to data of
write-locked objects, except in case of undo, where the refcount is write-locked objects, except in case of undo, where the refcount is
increased later, when the object is read-locked. increased later, when the object is read-locked.
Keys are checksums and values are number of references. Keys are data ids and values are number of references.
If reset is true, existing data must be discarded and If reset is true, existing data must be discarded and
self._uncommitted_data must be an empty dict. self._uncommitted_data must be an empty dict.
...@@ -294,19 +294,19 @@ class DatabaseManager(object): ...@@ -294,19 +294,19 @@ class DatabaseManager(object):
"""Store a transaction temporarily, if temporary is true. Note """Store a transaction temporarily, if temporary is true. Note
that this transaction is not finished yet. The list of objects that this transaction is not finished yet. The list of objects
contains tuples, each of which consists of an object ID, contains tuples, each of which consists of an object ID,
a checksum and object serial. a data_id and object serial.
The transaction is either None or a tuple of the list of OIDs, The transaction is either None or a tuple of the list of OIDs,
user information, a description, extension information and transaction user information, a description, extension information and transaction
pack state (True for packed).""" pack state (True for packed)."""
raise NotImplementedError raise NotImplementedError
def _pruneData(self, checksum_list): def _pruneData(self, data_id_list):
"""To be overriden by the backend to delete any unreferenced data """To be overriden by the backend to delete any unreferenced data
'unreferenced' means: 'unreferenced' means:
- not in self._uncommitted_data - not in self._uncommitted_data
- and not referenced by a fully-committed object (storage should have - and not referenced by a fully-committed object (storage should have
an index or a refcound of all data checksums of all objects) an index or a refcound of all data ids of all objects)
""" """
raise NotImplementedError raise NotImplementedError
...@@ -318,38 +318,39 @@ class DatabaseManager(object): ...@@ -318,38 +318,39 @@ class DatabaseManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def storeData(self, checksum, data=None, compression=None): def storeData(self, checksum_or_id, data=None, compression=None):
"""Store object raw data """Store object raw data
'checksum' must be the result of neo.lib.util.makeChecksum(data) checksum must be the result of neo.lib.util.makeChecksum(data)
'compression' indicates if 'data' is compressed. 'compression' indicates if 'data' is compressed.
A volatile reference is set to this data until 'unlockData' is called A volatile reference is set to this data until 'unlockData' is called
with this checksum. with this checksum.
If called with only a checksum, it only increment the volatile If called with only an id, it only increment the volatile
reference to the data matching the checksum. reference to the data matching the id.
""" """
refcount = self._uncommitted_data refcount = self._uncommitted_data
refcount[checksum] = 1 + refcount.get(checksum, 0)
if data is not None: if data is not None:
self._storeData(checksum, data, compression) checksum_or_id = self._storeData(checksum_or_id, data, compression)
refcount[checksum_or_id] = 1 + refcount.get(checksum_or_id, 0)
return checksum_or_id
def unlockData(self, checksum_list, prune=False): def unlockData(self, data_id_list, prune=False):
"""Release 1 volatile reference to given list of checksums """Release 1 volatile reference to given list of checksums
If 'prune' is true, any data that is not referenced anymore (either by If 'prune' is true, any data that is not referenced anymore (either by
a volatile reference or by a fully-committed object) is deleted. a volatile reference or by a fully-committed object) is deleted.
""" """
refcount = self._uncommitted_data refcount = self._uncommitted_data
for checksum in checksum_list: for data_id in data_id_list:
count = refcount[checksum] - 1 count = refcount[data_id] - 1
if count: if count:
refcount[checksum] = count refcount[data_id] = count
else: else:
del refcount[checksum] del refcount[data_id]
if prune: if prune:
self.begin() self.begin()
try: try:
self._pruneData(checksum_list) self._pruneData(data_id_list)
except: except:
self.rollback() self.rollback()
raise raise
...@@ -379,8 +380,8 @@ class DatabaseManager(object): ...@@ -379,8 +380,8 @@ class DatabaseManager(object):
" of _getDataTID. It should be overriden by backend storage.") " of _getDataTID. It should be overriden by backend storage.")
r = self._getObject(oid, tid, before_tid) r = self._getObject(oid, tid, before_tid)
if r: if r:
serial, _, _, checksum, _, value_serial = r serial, _, _, data_id, _, value_serial = r
if value_serial is None and checksum: if value_serial is None and data_id:
return serial, serial return serial, serial
return serial, value_serial return serial, value_serial
return None, None return None, None
...@@ -524,7 +525,7 @@ class DatabaseManager(object): ...@@ -524,7 +525,7 @@ class DatabaseManager(object):
to it. to it.
- getObjectData function - getObjectData function
To call if value_serial is None and an object needs to be updated. To call if value_serial is None and an object needs to be updated.
Takes no parameter, returns a 3-tuple: compression, checksum, Takes no parameter, returns a 3-tuple: compression, data_id,
value value
""" """
raise NotImplementedError raise NotImplementedError
......
...@@ -83,6 +83,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -83,6 +83,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.conn = MySQLdb.connect(**kwd) self.conn = MySQLdb.connect(**kwd)
self.conn.autocommit(False) self.conn.autocommit(False)
self.conn.query("SET SESSION group_concat_max_len = -1") self.conn.query("SET SESSION group_concat_max_len = -1")
self.conn.set_sql_mode("TRADITIONAL,NO_ENGINE_SUBSTITUTION")
def _begin(self): def _begin(self):
self.query("""BEGIN""") self.query("""BEGIN""")
...@@ -172,20 +173,23 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -172,20 +173,23 @@ class MySQLDatabaseManager(DatabaseManager):
PRIMARY KEY (partition, tid) PRIMARY KEY (partition, tid)
) ENGINE = InnoDB""" + p) ) ENGINE = InnoDB""" + p)
# The table "obj" stores committed object data. # The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj ( q("""CREATE TABLE IF NOT EXISTS obj (
partition SMALLINT UNSIGNED NOT NULL, partition SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL,
serial BIGINT UNSIGNED NOT NULL, serial BIGINT UNSIGNED NOT NULL,
hash BINARY(20) NULL, data_id BIGINT UNSIGNED NULL,
value_serial BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL,
PRIMARY KEY (partition, oid, serial), PRIMARY KEY (partition, oid, serial),
KEY (hash(4)) KEY (data_id)
) ENGINE = InnoDB""" + p) ) ENGINE = InnoDB""" + p)
# # The table "data" stores object data.
# We'd like to have partial index on 'hash' colum (e.g. hash(4))
# but 'UNIQUE' constraint would not work as expected.
q("""CREATE TABLE IF NOT EXISTS data ( q("""CREATE TABLE IF NOT EXISTS data (
hash BINARY(20) NOT NULL PRIMARY KEY, id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
hash BINARY(20) NOT NULL UNIQUE,
compression TINYINT UNSIGNED NULL, compression TINYINT UNSIGNED NULL,
value LONGBLOB NULL value LONGBLOB NULL
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
...@@ -201,18 +205,18 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -201,18 +205,18 @@ class MySQLDatabaseManager(DatabaseManager):
ext BLOB NOT NULL ext BLOB NOT NULL
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
# The table "tobj" stores uncommitted object data. # The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj ( q("""CREATE TABLE IF NOT EXISTS tobj (
partition SMALLINT UNSIGNED NOT NULL, partition SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL,
serial BIGINT UNSIGNED NOT NULL, serial BIGINT UNSIGNED NOT NULL,
hash BINARY(20) NULL, data_id BIGINT UNSIGNED NULL,
value_serial BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL,
PRIMARY KEY (serial, oid) PRIMARY KEY (serial, oid)
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
self._uncommitted_data = dict(q("SELECT hash, count(*)" self._uncommitted_data = dict(q("SELECT data_id, count(*)"
" FROM tobj WHERE hash IS NOT NULL GROUP BY hash") or ()) " FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id") or ())
def getConfiguration(self, key): def getConfiguration(self, key):
if key in self._config: if key in self._config:
...@@ -313,8 +317,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -313,8 +317,8 @@ class MySQLDatabaseManager(DatabaseManager):
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
q = self.query q = self.query
partition = self._getPartition(oid) partition = self._getPartition(oid)
sql = ('SELECT serial, compression, obj.hash, value, value_serial' sql = ('SELECT serial, compression, data.hash, value, value_serial'
' FROM obj LEFT JOIN data ON (obj.hash = data.hash)' ' FROM obj LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE partition = %d AND oid = %d') % (partition, oid) ' WHERE partition = %d AND oid = %d') % (partition, oid)
if tid is not None: if tid is not None:
sql += ' AND serial = %d' % tid sql += ' AND serial = %d' % tid
...@@ -389,12 +393,12 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -389,12 +393,12 @@ class MySQLDatabaseManager(DatabaseManager):
# delete. It should be done as an idle task, by chunks. # delete. It should be done as an idle task, by chunks.
for partition in offset_list: for partition in offset_list:
where = " WHERE partition=%d" % partition where = " WHERE partition=%d" % partition
checksum_list = [x for x, in data_id_list = [x for x, in
q("SELECT DISTINCT hash FROM obj" + where) if x] q("SELECT DISTINCT data_id FROM obj" + where) if x]
if not self._use_partition: if not self._use_partition:
q("DELETE FROM obj" + where) q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where) q("DELETE FROM trans" + where)
self._pruneData(checksum_list) self._pruneData(data_id_list)
except: except:
self.rollback() self.rollback()
raise raise
...@@ -413,14 +417,14 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -413,14 +417,14 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query q = self.query
self.begin() self.begin()
try: try:
checksum_list = [x for x, in q("SELECT hash FROM tobj") if x] data_id_list = [x for x, in q("SELECT data_id FROM tobj") if x]
q("""TRUNCATE tobj""") q("""TRUNCATE tobj""")
q("""TRUNCATE ttrans""") q("""TRUNCATE ttrans""")
except: except:
self.rollback() self.rollback()
raise raise
self.commit() self.commit()
self.unlockData(checksum_list, True) self.unlockData(data_id_list, True)
def storeTransaction(self, tid, object_list, transaction, temporary = True): def storeTransaction(self, tid, object_list, transaction, temporary = True):
q = self.query q = self.query
...@@ -437,24 +441,20 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -437,24 +441,20 @@ class MySQLDatabaseManager(DatabaseManager):
self.begin() self.begin()
try: try:
for oid, checksum, value_serial in object_list: for oid, data_id, value_serial in object_list:
oid = u64(oid) oid = u64(oid)
partition = self._getPartition(oid) partition = self._getPartition(oid)
if value_serial: if value_serial:
value_serial = u64(value_serial) value_serial = u64(value_serial)
(checksum,), = q("SELECT hash FROM obj" (data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=%d AND oid=%d AND serial=%d" " WHERE partition=%d AND oid=%d AND serial=%d"
% (partition, oid, value_serial)) % (partition, oid, value_serial))
if temporary: if temporary:
self.storeData(checksum) self.storeData(data_id)
else: else:
value_serial = 'NULL' value_serial = 'NULL'
if checksum: q("REPLACE INTO %s VALUES (%d, %d, %d, %s, %s)" % (obj_table,
checksum = "'%s'" % e(checksum) partition, oid, tid, data_id or 'NULL', value_serial))
else:
checksum = 'NULL'
q("REPLACE INTO %s VALUES (%d, %d, %d, %s, %s)" %
(obj_table, partition, oid, tid, checksum, value_serial))
if transaction is not None: if transaction is not None:
oid_list, user, desc, ext, packed = transaction oid_list, user, desc, ext, packed = transaction
...@@ -472,13 +472,13 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -472,13 +472,13 @@ class MySQLDatabaseManager(DatabaseManager):
raise raise
self.commit() self.commit()
def _pruneData(self, checksum_list): def _pruneData(self, data_id_list):
checksum_list = set(checksum_list).difference(self._uncommitted_data) data_id_list = set(data_id_list).difference(self._uncommitted_data)
if checksum_list: if data_id_list:
self.query("DELETE data FROM data" self.query("DELETE data FROM data"
" LEFT JOIN obj ON (data.hash = obj.hash)" " LEFT JOIN obj ON (id = data_id)"
" WHERE data.hash IN ('%s') AND obj.hash IS NULL" " WHERE id IN (%s) AND data_id IS NULL"
% "','".join(map(self.escape, checksum_list))) % ",".join(map(str, data_id_list)))
def _storeData(self, checksum, data, compression): def _storeData(self, checksum, data, compression):
e = self.escape e = self.escape
...@@ -486,22 +486,25 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -486,22 +486,25 @@ class MySQLDatabaseManager(DatabaseManager):
self.begin() self.begin()
try: try:
try: try:
self.query("INSERT INTO data VALUES ('%s', %d, '%s')" % self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" %
(checksum, compression, e(data))) (checksum, compression, e(data)))
except IntegrityError, (code, _): except IntegrityError, (code, _):
if code != DUP_ENTRY: if code != DUP_ENTRY:
raise raise
r, = self.query("SELECT compression, value FROM data" (r, c, d), = self.query("SELECT id, compression, value"
" WHERE hash='%s'" % checksum) " FROM data WHERE hash='%s'" % checksum)
if r != (compression, data): if c != compression or d != data:
raise raise
else:
r = self.conn.insert_id()
except: except:
self.rollback() self.rollback()
raise raise
self.commit() self.commit()
return r
def _getDataTID(self, oid, tid=None, before_tid=None): def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT serial, hash, value_serial FROM obj' sql = ('SELECT serial, data_id, value_serial FROM obj'
' WHERE partition = %d AND oid = %d' ' WHERE partition = %d AND oid = %d'
) % (self._getPartition(oid), oid) ) % (self._getPartition(oid), oid)
if tid is not None: if tid is not None:
...@@ -514,8 +517,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -514,8 +517,8 @@ class MySQLDatabaseManager(DatabaseManager):
sql += ' ORDER BY serial DESC LIMIT 1' sql += ' ORDER BY serial DESC LIMIT 1'
r = self.query(sql) r = self.query(sql)
if r: if r:
(serial, checksum, value_serial), = r (serial, data_id, value_serial), = r
if value_serial is None and checksum: if value_serial is None and data_id:
return serial, serial return serial, serial
return serial, value_serial return serial, value_serial
return None, None return None, None
...@@ -526,7 +529,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -526,7 +529,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.begin() self.begin()
try: try:
sql = " FROM tobj WHERE serial=%d" % tid sql = " FROM tobj WHERE serial=%d" % tid
checksum_list = [x for x, in q("SELECT hash" + sql) if x] data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("INSERT INTO obj SELECT *" + sql) q("INSERT INTO obj SELECT *" + sql)
q("DELETE FROM tobj WHERE serial=%d" % tid) q("DELETE FROM tobj WHERE serial=%d" % tid)
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid) q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
...@@ -535,7 +538,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -535,7 +538,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.rollback() self.rollback()
raise raise
self.commit() self.commit()
self.unlockData(checksum_list) self.unlockData(data_id_list)
def deleteTransaction(self, tid, oid_list=()): def deleteTransaction(self, tid, oid_list=()):
q = self.query q = self.query
...@@ -545,22 +548,22 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -545,22 +548,22 @@ class MySQLDatabaseManager(DatabaseManager):
self.begin() self.begin()
try: try:
sql = " FROM tobj WHERE serial=%d" % tid sql = " FROM tobj WHERE serial=%d" % tid
checksum_list = [x for x, in q("SELECT hash" + sql) if x] data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
self.unlockData(checksum_list) self.unlockData(data_id_list)
q("DELETE" + sql) q("DELETE" + sql)
q("""DELETE FROM ttrans WHERE tid = %d""" % tid) q("""DELETE FROM ttrans WHERE tid = %d""" % tid)
q("""DELETE FROM trans WHERE partition = %d AND tid = %d""" % q("""DELETE FROM trans WHERE partition = %d AND tid = %d""" %
(getPartition(tid), tid)) (getPartition(tid), tid))
# delete from obj using indexes # delete from obj using indexes
checksum_set = set() data_id_set = set()
for oid in oid_list: for oid in oid_list:
oid = u64(oid) oid = u64(oid)
sql = " FROM obj WHERE partition=%d AND oid=%d AND serial=%d" \ sql = " FROM obj WHERE partition=%d AND oid=%d AND serial=%d" \
% (getPartition(oid), oid, tid) % (getPartition(oid), oid, tid)
checksum_set.update(*q("SELECT hash" + sql)) data_id_set.update(*q("SELECT data_id" + sql))
q("DELETE" + sql) q("DELETE" + sql)
checksum_set.discard(None) data_id_set.discard(None)
self._pruneData(checksum_set) self._pruneData(data_id_set)
except: except:
self.rollback() self.rollback()
raise raise
...@@ -590,9 +593,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -590,9 +593,9 @@ class MySQLDatabaseManager(DatabaseManager):
sql += ' AND serial=%d' % u64(serial) sql += ' AND serial=%d' % u64(serial)
self.begin() self.begin()
try: try:
checksum_list = [x for x, in q("SELECT DISTINCT hash" + sql) if x] data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
q("DELETE" + sql) q("DELETE" + sql)
self._pruneData(checksum_list) self._pruneData(data_id_list)
except: except:
self.rollback() self.rollback()
raise raise
...@@ -607,9 +610,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -607,9 +610,9 @@ class MySQLDatabaseManager(DatabaseManager):
(partition, u64(max_tid), oid, oid, u64(serial))) (partition, u64(max_tid), oid, oid, u64(serial)))
self.begin() self.begin()
try: try:
checksum_list = [x for x, in q("SELECT DISTINCT hash" + sql) if x] data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
q("DELETE" + sql) q("DELETE" + sql)
self._pruneData(checksum_list) self._pruneData(data_id_list)
except: except:
self.rollback() self.rollback()
raise raise
...@@ -637,7 +640,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -637,7 +640,7 @@ class MySQLDatabaseManager(DatabaseManager):
if value_serial is None: if value_serial is None:
raise CreationUndone raise CreationUndone
r = self.query("""SELECT LENGTH(value), value_serial r = self.query("""SELECT LENGTH(value), value_serial
FROM obj LEFT JOIN data ON (obj.hash = data.hash) FROM obj LEFT JOIN data ON (obj.data_id = data.id)
WHERE partition = %d AND oid = %d AND serial = %d""" % WHERE partition = %d AND oid = %d AND serial = %d""" %
(self._getPartition(oid), oid, value_serial)) (self._getPartition(oid), oid, value_serial))
length, value_serial = r[0] length, value_serial = r[0]
...@@ -656,7 +659,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -656,7 +659,7 @@ class MySQLDatabaseManager(DatabaseManager):
p64 = util.p64 p64 = util.p64
pack_tid = self._getPackTID() pack_tid = self._getPackTID()
r = self.query("""SELECT serial, LENGTH(value), value_serial r = self.query("""SELECT serial, LENGTH(value), value_serial
FROM obj LEFT JOIN data ON (obj.hash = data.hash) FROM obj LEFT JOIN data ON (obj.data_id = data.id)
WHERE partition = %d AND oid = %d AND serial >= %d WHERE partition = %d AND oid = %d AND serial >= %d
ORDER BY serial DESC LIMIT %d, %d""" \ ORDER BY serial DESC LIMIT %d, %d""" \
% (self._getPartition(oid), oid, pack_tid, offset, length)) % (self._getPartition(oid), oid, pack_tid, offset, length))
...@@ -768,25 +771,25 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -768,25 +771,25 @@ class MySQLDatabaseManager(DatabaseManager):
% tid): % tid):
partition = getPartition(oid) partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE partition = %d" if q("SELECT 1 FROM obj WHERE partition = %d"
" AND oid = %d AND serial = %d AND hash IS NULL" " AND oid = %d AND serial = %d AND data_id IS NULL"
% (partition, oid, max_serial)): % (partition, oid, max_serial)):
max_serial += 1 max_serial += 1
elif not count: elif not count:
continue continue
# There are things to delete for this object # There are things to delete for this object
checksum_set = set() data_id_set = set()
sql = ' FROM obj WHERE partition=%d AND oid=%d' \ sql = ' FROM obj WHERE partition=%d AND oid=%d' \
' AND serial<%d' % (partition, oid, max_serial) ' AND serial<%d' % (partition, oid, max_serial)
for serial, checksum in q('SELECT serial, hash' + sql): for serial, data_id in q('SELECT serial, data_id' + sql):
checksum_set.add(checksum) data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial) new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial: if new_serial:
new_serial = p64(new_serial) new_serial = p64(new_serial)
updateObjectDataForPack(p64(oid), p64(serial), updateObjectDataForPack(p64(oid), p64(serial),
new_serial, checksum) new_serial, data_id)
q('DELETE' + sql) q('DELETE' + sql)
checksum_set.discard(None) data_id_set.discard(None)
self._pruneData(checksum_set) self._pruneData(data_id_set)
except: except:
self.rollback() self.rollback()
raise raise
......
...@@ -173,11 +173,11 @@ class ReplicationHandler(EventHandler): ...@@ -173,11 +173,11 @@ class ReplicationHandler(EventHandler):
serial_end, compression, checksum, data, data_serial): serial_end, compression, checksum, data, data_serial):
dm = self.app.dm dm = self.app.dm
if data or checksum != ZERO_HASH: if data or checksum != ZERO_HASH:
dm.storeData(checksum, data, compression) data_id = dm.storeData(checksum, data, compression)
else: else:
checksum = None data_id = None
# Directly store the transaction. # Directly store the transaction.
obj = oid, checksum, data_serial obj = oid, data_id, data_serial
dm.storeTransaction(serial_start, [obj], None, False) dm.storeTransaction(serial_start, [obj], None, False)
def _doAskCheckSerialRange(self, min_oid, min_tid, max_tid, def _doAskCheckSerialRange(self, min_oid, min_tid, max_tid,
......
...@@ -98,12 +98,12 @@ class Transaction(object): ...@@ -98,12 +98,12 @@ class Transaction(object):
# assert self._transaction is not None # assert self._transaction is not None
self._transaction = (oid_list, user, desc, ext, packed) self._transaction = (oid_list, user, desc, ext, packed)
def addObject(self, oid, checksum, value_serial): def addObject(self, oid, data_id, value_serial):
""" """
Add an object to the transaction Add an object to the transaction
""" """
assert oid not in self._checked_set, dump(oid) assert oid not in self._checked_set, dump(oid)
self._object_dict[oid] = oid, checksum, value_serial self._object_dict[oid] = oid, data_id, value_serial
def delObject(self, oid): def delObject(self, oid):
try: try:
...@@ -241,9 +241,9 @@ class TransactionManager(object): ...@@ -241,9 +241,9 @@ class TransactionManager(object):
# drop the lock it held on this object, and drop object data for # drop the lock it held on this object, and drop object data for
# consistency. # consistency.
del self._store_lock_dict[oid] del self._store_lock_dict[oid]
checksum = self._transaction_dict[ttid].delObject(oid) data_id = self._transaction_dict[ttid].delObject(oid)
if checksum: if data_id:
self._app.dm.pruneData((checksum,)) self._app.dm.pruneData((data_id,))
# Give a chance to pending events to take that lock now. # Give a chance to pending events to take that lock now.
self._app.executeQueuedEvents() self._app.executeQueuedEvents()
# Attemp to acquire lock again. # Attemp to acquire lock again.
...@@ -303,10 +303,10 @@ class TransactionManager(object): ...@@ -303,10 +303,10 @@ class TransactionManager(object):
# store object # store object
assert ttid in self, "Transaction not registered" assert ttid in self, "Transaction not registered"
if data is None: if data is None:
checksum = None data_id = None
else: else:
self._app.dm.storeData(checksum, data, compression) data_id = self._app.dm.storeData(checksum, data, compression)
self._transaction_dict[ttid].addObject(oid, checksum, value_serial) self._transaction_dict[ttid].addObject(oid, data_id, value_serial)
def abort(self, ttid, even_if_locked=False): def abort(self, ttid, even_if_locked=False):
""" """
...@@ -328,9 +328,9 @@ class TransactionManager(object): ...@@ -328,9 +328,9 @@ class TransactionManager(object):
if not even_if_locked: if not even_if_locked:
return return
else: else:
self._app.dm.unlockData([checksum self._app.dm.unlockData([data_id
for oid, checksum, value_serial in transaction.getObjectList() for oid, data_id, value_serial in transaction.getObjectList()
if checksum], True) if data_id], True)
# unlock any object # unlock any object
for oid in transaction.getLockedOIDList(): for oid in transaction.getLockedOIDList():
if has_load_lock: if has_load_lock:
...@@ -379,13 +379,13 @@ class TransactionManager(object): ...@@ -379,13 +379,13 @@ class TransactionManager(object):
for oid, ttid in self._store_lock_dict.items(): for oid, ttid in self._store_lock_dict.items():
neo.lib.logging.info(' %r by %r', dump(oid), dump(ttid)) neo.lib.logging.info(' %r by %r', dump(oid), dump(ttid))
def updateObjectDataForPack(self, oid, orig_serial, new_serial, checksum): def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid) lock_tid = self.getLockingTID(oid)
if lock_tid is not None: if lock_tid is not None:
transaction = self._transaction_dict[lock_tid] transaction = self._transaction_dict[lock_tid]
if transaction.getObject(oid)[2] == orig_serial: if transaction.getObject(oid)[2] == orig_serial:
if new_serial: if new_serial:
checksum = None data_id = None
else: else:
self._app.dm.storeData(checksum) self._app.dm.storeData(data_id)
transaction.addObject(oid, checksum, new_serial) transaction.addObject(oid, data_id, new_serial)
...@@ -271,6 +271,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -271,6 +271,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
checksum = "0" * 20 checksum = "0" * 20
data = 'foo' data = 'foo'
data_serial = None data_serial = None
app.dm.mockAddReturnValues(storeData=checksum)
ReplicationHandler(app).answerObject(conn, oid, serial_start, ReplicationHandler(app).answerObject(conn, oid, serial_start,
serial_end, compression, checksum, data, data_serial) serial_end, compression, checksum, data, data_serial)
calls = app.dm.mockGetNamedCalls('storeTransaction') calls = app.dm.mockGetNamedCalls('storeTransaction')
......
...@@ -122,9 +122,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -122,9 +122,8 @@ class StorageDBTests(NeoUnitTestBase):
def getTransaction(self, oid_list): def getTransaction(self, oid_list):
transaction = (oid_list, 'user', 'desc', 'ext', False) transaction = (oid_list, 'user', 'desc', 'ext', False)
H = "0" * 20 H = "0" * 20
for _ in oid_list: object_list = [(oid, self.db.storeData(H, '', 1), None)
self.db.storeData(H, '', 1) for oid in oid_list]
object_list = [(oid, H, None) for oid in oid_list]
return (transaction, object_list) return (transaction, object_list)
def checkSet(self, list1, list2): def checkSet(self, list1, list2):
...@@ -584,10 +583,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -584,10 +583,8 @@ class StorageDBTests(NeoUnitTestBase):
tid4 = self.getNextTID() tid4 = self.getNextTID()
tid5 = self.getNextTID() tid5 = self.getNextTID()
oid1 = self.getOID(1) oid1 = self.getOID(1)
foo = "3" * 20 foo = db.storeData("3" * 20, 'foo', 0)
bar = "4" * 20 bar = db.storeData("4" * 20, 'bar', 0)
db.storeData(foo, 'foo', 0)
db.storeData(bar, 'bar', 0)
db.unlockData((foo, bar)) db.unlockData((foo, bar))
db.storeTransaction( db.storeTransaction(
tid1, ( tid1, (
......
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import random
import unittest import unittest
from mock import Mock from mock import Mock, ReturnValues
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.transactions import Transaction, TransactionManager from neo.storage.transactions import Transaction, TransactionManager
from neo.storage.transactions import ConflictError, DelayedError from neo.storage.transactions import ConflictError, DelayedError
...@@ -125,6 +125,8 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -125,6 +125,8 @@ class TransactionManagerTests(NeoUnitTestBase):
def testSimpleCase(self): def testSimpleCase(self):
""" One node, one transaction, not abort """ """ One node, one transaction, not abort """
data_id_list = random.random(), random.random()
self.app.dm.mockAddReturnValues(storeData=ReturnValues(*data_id_list))
uuid = self.getNewUUID() uuid = self.getNewUUID()
ttid = self.getNextTID() ttid = self.getNextTID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
...@@ -137,8 +139,8 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -137,8 +139,8 @@ class TransactionManagerTests(NeoUnitTestBase):
self.assertTrue(ttid in self.manager) self.assertTrue(ttid in self.manager)
self.manager.lock(ttid, tid, txn[0]) self.manager.lock(ttid, tid, txn[0])
self._checkTransactionStored(tid, [ self._checkTransactionStored(tid, [
(object1[0], object1[2], object1[4]), (object1[0], data_id_list[0], object1[4]),
(object2[0], object2[2], object2[4]), (object2[0], data_id_list[1], object2[4]),
], txn) ], txn)
self.manager.unlock(ttid) self.manager.unlock(ttid)
self.assertFalse(ttid in self.manager) self.assertFalse(ttid in self.manager)
...@@ -331,6 +333,8 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -331,6 +333,8 @@ class TransactionManagerTests(NeoUnitTestBase):
self.assertFalse(self.manager.loadLocked(oid)) self.assertFalse(self.manager.loadLocked(oid))
def test_getObjectFromTransaction(self): def test_getObjectFromTransaction(self):
data_id = random.random()
self.app.dm.mockAddReturnValues(storeData=ReturnValues(data_id))
uuid = self.getNewUUID() uuid = self.getNewUUID()
tid1, txn1 = self._getTransaction() tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
...@@ -343,7 +347,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -343,7 +347,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj2[0]), self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj2[0]),
None) None)
self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj1[0]), self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj1[0]),
(obj1[0], obj1[2], obj1[4])) (obj1[0], data_id, obj1[4]))
def test_getLockingTID(self): def test_getLockingTID(self):
uuid = self.getNewUUID() uuid = self.getNewUUID()
......
...@@ -308,13 +308,14 @@ class StorageApplication(ServerNode, neo.storage.app.Application): ...@@ -308,13 +308,14 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
adapter = self._init_args[1]['getAdapter'] adapter = self._init_args[1]['getAdapter']
dm = self.dm dm = self.dm
if adapter == 'BTree': if adapter == 'BTree':
checksum_list = dm._data checksum_dict = dict((x, x) for x in dm._data)
elif adapter == 'MySQL': elif adapter == 'MySQL':
checksum_list = [x for x, in dm.query("SELECT hash FROM data")] checksum_dict = dict(dm.query("SELECT id, hash FROM data"))
else: else:
assert False assert False
assert set(dm._uncommitted_data).issubset(checksum_list) assert set(dm._uncommitted_data).issubset(checksum_dict)
return dict((x, dm._uncommitted_data.get(x, 0)) for x in checksum_list) get = dm._uncommitted_data.get
return dict((v, get(k, 0)) for k, v in checksum_dict.iteritems())
class ClientApplication(Node, neo.client.app.Application): class ClientApplication(Node, neo.client.app.Application):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment