Commit 6a328e13 authored by Vincent Pelletier's avatar Vincent Pelletier

New table obj_short with fewer columns than obj table.

This should improve replication & pack performance.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2415 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b38f1439
...@@ -129,7 +129,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -129,7 +129,8 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query q = self.query
if reset: if reset:
q("""DROP TABLE IF EXISTS config, pt, trans, obj, ttrans, tobj""") q('DROP TABLE IF EXISTS config, pt, trans, obj, obj_short, '
'ttrans, tobj')
# The table "config" stores configuration parameters which affect the # The table "config" stores configuration parameters which affect the
# persistent data. # persistent data.
...@@ -170,6 +171,16 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -170,6 +171,16 @@ class MySQLDatabaseManager(DatabaseManager):
PRIMARY KEY (partition, oid, serial) PRIMARY KEY (partition, oid, serial)
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
# The table "obj_short" contains columns which are accessed in queries
# which don't need to access object data. This is needed because InnoDB
# loads a whole row even when it only needs columns in primary key.
q('CREATE TABLE IF NOT EXISTS obj_short ('
'partition SMALLINT UNSIGNED NOT NULL,'
'oid BIGINT UNSIGNED NOT NULL,'
'serial BIGINT UNSIGNED NOT NULL,'
'PRIMARY KEY (partition, oid, serial)'
') ENGINE = InnoDB')
# The table "ttrans" stores information on uncommitted transactions. # The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans ( q("""CREATE TABLE IF NOT EXISTS ttrans (
partition SMALLINT UNSIGNED NOT NULL, partition SMALLINT UNSIGNED NOT NULL,
...@@ -192,6 +203,16 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -192,6 +203,16 @@ class MySQLDatabaseManager(DatabaseManager):
value_serial BIGINT UNSIGNED NULL value_serial BIGINT UNSIGNED NULL
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
def objQuery(self, query):
"""
Execute given query for both obj and obj_short tables.
query: query string, must contain "%(table)s" where obj table name is
needed.
"""
q = self.query
for table in ('obj', 'obj_short'):
q(query % {'table': table})
def getConfiguration(self, key): def getConfiguration(self, key):
if key in self._config: if key in self._config:
return self._config[key] return self._config[key]
...@@ -278,8 +299,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -278,8 +299,8 @@ class MySQLDatabaseManager(DatabaseManager):
tid = util.u64(tid) tid = util.u64(tid)
partition = self._getPartition(oid) partition = self._getPartition(oid)
self.begin() self.begin()
r = q("SELECT oid FROM obj WHERE partition=%d AND oid=%d AND serial=%d" r = q("SELECT oid FROM obj_short WHERE partition=%d AND oid=%d AND "
% (partition, oid, tid)) "serial=%d" % (partition, oid, tid))
if not r and all: if not r and all:
r = q("""SELECT oid FROM tobj WHERE oid = %d AND serial = %d""" \ r = q("""SELECT oid FROM tobj WHERE oid = %d AND serial = %d""" \
% (oid, tid)) % (oid, tid))
...@@ -334,7 +355,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -334,7 +355,7 @@ class MySQLDatabaseManager(DatabaseManager):
serial, compression, checksum, data, value_serial = r[0] serial, compression, checksum, data, value_serial = r[0]
except IndexError: except IndexError:
return None return None
r = q("""SELECT serial FROM obj r = q("""SELECT serial FROM obj_short
WHERE partition = %d WHERE partition = %d
AND oid = %d AND serial >= %d AND oid = %d AND serial >= %d
ORDER BY serial LIMIT 1""" \ ORDER BY serial LIMIT 1""" \
...@@ -395,7 +416,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -395,7 +416,7 @@ class MySQLDatabaseManager(DatabaseManager):
offset_list = ', '.join((str(i) for i in offset_list)) offset_list = ', '.join((str(i) for i in offset_list))
self.begin() self.begin()
try: try:
q("""DELETE FROM obj WHERE partition IN (%s)""" % self.objQuery('DELETE FROM %%(table)s WHERE partition IN (%s)' %
(offset_list, )) (offset_list, ))
q("""DELETE FROM trans WHERE partition IN (%s)""" % q("""DELETE FROM trans WHERE partition IN (%s)""" %
(offset_list, )) (offset_list, ))
...@@ -450,6 +471,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -450,6 +471,10 @@ class MySQLDatabaseManager(DatabaseManager):
q("""REPLACE INTO %s VALUES (%d, %d, %d, %s, %s, %s, %s)""" \ q("""REPLACE INTO %s VALUES (%d, %d, %d, %s, %s, %s, %s)""" \
% (obj_table, partition, oid, tid, compression, checksum, % (obj_table, partition, oid, tid, compression, checksum,
data, value_serial)) data, value_serial))
if obj_table == 'obj':
# Update obj_short too
q('REPLACE INTO obj_short VALUES (%d, %d, %d)' % (
partition, oid, tid))
if transaction is not None: if transaction is not None:
oid_list, user, desc, ext, packed = transaction oid_list, user, desc, ext, packed = transaction
...@@ -493,6 +518,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -493,6 +518,8 @@ class MySQLDatabaseManager(DatabaseManager):
try: try:
q("""INSERT INTO obj SELECT * FROM tobj WHERE tobj.serial = %d""" \ q("""INSERT INTO obj SELECT * FROM tobj WHERE tobj.serial = %d""" \
% tid) % tid)
q('INSERT INTO obj_short SELECT partition, oid, serial FROM tobj'
' WHERE tobj.serial = %d' % (tid, ))
q("""DELETE FROM tobj WHERE serial = %d""" % tid) q("""DELETE FROM tobj WHERE serial = %d""" % tid)
q("""INSERT INTO trans SELECT * FROM ttrans WHERE ttrans.tid = %d""" q("""INSERT INTO trans SELECT * FROM ttrans WHERE ttrans.tid = %d"""
% tid) % tid)
...@@ -504,6 +531,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -504,6 +531,7 @@ class MySQLDatabaseManager(DatabaseManager):
def deleteTransaction(self, tid, oid_list=()): def deleteTransaction(self, tid, oid_list=()):
q = self.query q = self.query
objQuery = self.objQuery
u64 = util.u64 u64 = util.u64
tid = u64(tid) tid = u64(tid)
getPartition = self._getPartition getPartition = self._getPartition
...@@ -517,8 +545,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -517,8 +545,9 @@ class MySQLDatabaseManager(DatabaseManager):
for oid in oid_list: for oid in oid_list:
oid = u64(oid) oid = u64(oid)
partition = getPartition(oid) partition = getPartition(oid)
q("DELETE FROM obj WHERE partition=%(partition)d " objQuery('DELETE FROM %%(table)s WHERE '
"AND oid = %(oid)d AND serial = %(serial)d" % { 'partition=%(partition)d '
'AND oid = %(oid)d AND serial = %(serial)d' % {
'partition': partition, 'partition': partition,
'oid': oid, 'oid': oid,
'serial': tid, 'serial': tid,
...@@ -535,14 +564,14 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -535,14 +564,14 @@ class MySQLDatabaseManager(DatabaseManager):
'partition': self._getPartition(oid), 'partition': self._getPartition(oid),
'oid': oid, 'oid': oid,
} }
query_fmt = """DELETE FROM obj WHERE partition = %(partition)d query_fmt = 'DELETE FROM %%(table)s WHERE ' \
AND oid = %(oid)d""" 'partition = %(partition)d AND oid = %(oid)d'
if serial is not None: if serial is not None:
query_param_dict['serial'] = u64(serial) query_param_dict['serial'] = u64(serial)
query_fmt = query_fmt + ' AND serial = %(serial)d' query_fmt = query_fmt + ' AND serial = %(serial)d'
self.begin() self.begin()
try: try:
self.query(query_fmt % query_param_dict) self.objQuery(query_fmt % query_param_dict)
except: except:
self.rollback() self.rollback()
raise raise
...@@ -613,7 +642,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -613,7 +642,7 @@ class MySQLDatabaseManager(DatabaseManager):
min_oid = u64(min_oid) min_oid = u64(min_oid)
min_serial = u64(min_serial) min_serial = u64(min_serial)
max_serial = u64(max_serial) max_serial = u64(max_serial)
r = q('SELECT oid, serial FROM obj ' r = q('SELECT oid, serial FROM obj_short '
'WHERE partition = %(partition)s ' 'WHERE partition = %(partition)s '
'AND serial <= %(max_serial)d ' 'AND serial <= %(max_serial)d '
'AND ((oid = %(min_oid)d AND serial >= %(min_serial)d) ' 'AND ((oid = %(min_oid)d AND serial >= %(min_serial)d) '
...@@ -723,6 +752,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -723,6 +752,7 @@ class MySQLDatabaseManager(DatabaseManager):
def pack(self, tid, updateObjectDataForPack): def pack(self, tid, updateObjectDataForPack):
# TODO: unit test (along with updatePackFuture) # TODO: unit test (along with updatePackFuture)
q = self.query q = self.query
objQuery = self.objQuery
tid = util.u64(tid) tid = util.u64(tid)
updatePackFuture = self._updatePackFuture updatePackFuture = self._updatePackFuture
getPartition = self._getPartition getPartition = self._getPartition
...@@ -730,7 +760,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -730,7 +760,7 @@ class MySQLDatabaseManager(DatabaseManager):
try: try:
self._setPackTID(tid) self._setPackTID(tid)
for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, ' for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, '
'MAX(serial) FROM obj WHERE serial <= %(tid)d ' 'MAX(serial) FROM obj_short WHERE serial <= %(tid)d '
'GROUP BY oid' % {'tid': tid}): 'GROUP BY oid' % {'tid': tid}):
if q('SELECT LENGTH(value) FROM obj WHERE partition =' if q('SELECT LENGTH(value) FROM obj WHERE partition ='
'%(partition)s AND oid = %(oid)d AND ' '%(partition)s AND oid = %(oid)d AND '
...@@ -743,7 +773,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -743,7 +773,7 @@ class MySQLDatabaseManager(DatabaseManager):
max_serial += 1 max_serial += 1
if count: if count:
# There are things to delete for this object # There are things to delete for this object
for (serial, ) in q('SELECT serial FROM obj WHERE ' for (serial, ) in q('SELECT serial FROM obj_short WHERE '
'partition=%(partition)d AND oid=%(oid)d AND ' 'partition=%(partition)d AND oid=%(oid)d AND '
'serial < %(max_serial)d' % { 'serial < %(max_serial)d' % {
'oid': oid, 'oid': oid,
...@@ -752,7 +782,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -752,7 +782,8 @@ class MySQLDatabaseManager(DatabaseManager):
}): }):
updatePackFuture(oid, serial, max_serial, updatePackFuture(oid, serial, max_serial,
updateObjectDataForPack) updateObjectDataForPack)
q('DELETE FROM obj WHERE partition=%(partition)d ' objQuery('DELETE FROM %%(table)s WHERE '
'partition=%(partition)d '
'AND oid=%(oid)d AND serial=%(serial)d' % { 'AND oid=%(oid)d AND serial=%(serial)d' % {
'partition': getPartition(oid), 'partition': getPartition(oid),
'oid': oid, 'oid': oid,
...@@ -788,7 +819,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -788,7 +819,7 @@ class MySQLDatabaseManager(DatabaseManager):
# XXX: XOR is a lame checksum # XXX: XOR is a lame checksum
u64 = util.u64 u64 = util.u64
p64 = util.p64 p64 = util.p64
r = self.query('SELECT oid, serial FROM obj WHERE ' r = self.query('SELECT oid, serial FROM obj_short WHERE '
'partition = %(partition)s AND ' 'partition = %(partition)s AND '
'(oid > %(min_oid)d OR ' '(oid > %(min_oid)d OR '
'(oid = %(min_oid)d AND serial >= %(min_serial)d)) ' '(oid = %(min_oid)d AND serial >= %(min_serial)d)) '
......
...@@ -266,6 +266,7 @@ class NEOCluster(object): ...@@ -266,6 +266,7 @@ class NEOCluster(object):
cursor.execute('rename table %s to tmp' % (table, )) cursor.execute('rename table %s to tmp' % (table, ))
cursor.execute('rename table t%s to %s' % (table, table)) cursor.execute('rename table t%s to %s' % (table, table))
cursor.execute('rename table tmp to t%s' % (table, )) cursor.execute('rename table tmp to t%s' % (table, ))
cursor.execute('truncate table obj_short')
sql_connection.commit() sql_connection.commit()
sql_connection.close() sql_connection.close()
......
...@@ -133,16 +133,20 @@ class StorageMySQSLdbTests(NeoUnitTestBase): ...@@ -133,16 +133,20 @@ class StorageMySQSLdbTests(NeoUnitTestBase):
self.assertEquals(self.db.escape("a'b"), "a\\'b") self.assertEquals(self.db.escape("a'b"), "a\\'b")
def test_setup(self): def test_setup(self):
# XXX: this test verifies irrelevant symptoms. It should instead check that
# - setup, store, setup, load -> data still there
# - setup, store, setup(reset=True), load -> data not found
# create all tables # create all tables
self.db.conn = Mock() self.db.conn = Mock()
self.db.setup() self.db.setup()
calls = self.db.conn.mockGetNamedCalls('query') calls = self.db.conn.mockGetNamedCalls('query')
self.assertEquals(len(calls), 6) self.assertEquals(len(calls), 7)
# create all tables but drop them first # create all tables but drop them first
self.db.conn = Mock() self.db.conn = Mock()
self.db.setup(reset=True) self.db.setup(reset=True)
calls = self.db.conn.mockGetNamedCalls('query') calls = self.db.conn.mockGetNamedCalls('query')
self.assertEquals(len(calls), 7) self.assertEquals(len(calls), 8)
def test_configuration(self): def test_configuration(self):
# check if a configuration entry is well written # check if a configuration entry is well written
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment