Commit 92e58a9c authored by Julien Muchembled's avatar Julien Muchembled

replication: use MD5 hash instead of XOR

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2804 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c5c572a2
......@@ -481,13 +481,6 @@ class PNumber(PStructItem):
def __init__(self, name):
PStructItem.__init__(self, name, '!L')
class PChecksum(PStructItem):
"""
A checksum
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!Q')
class PIndex(PStructItem):
"""
A big integer to defined indexes in a huge list.
......@@ -563,6 +556,7 @@ class PTID(PItem):
# same definition, for now
POID = PTID
PChecksum = PUUID # (md5 is same length as uuid)
# common definitions
......
......@@ -39,6 +39,11 @@ if sys.version_info < (2, 5):
return False
__builtin__.any = any
import md5, sha
sys.modules['hashlib'] = hashlib = imp.new_module('hashlib')
hashlib.md5 = md5.new
hashlib.sha1 = sha.new
import struct
class Struct(object):
......
......@@ -22,9 +22,10 @@ Not persistent ! (no data retained after process exit)
from BTrees.OOBTree import OOBTree as _OOBTree
import neo.lib
from hashlib import md5
from neo.storage.database import DatabaseManager
from neo.lib.protocol import CellStates
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID
from neo.lib import util
# The only purpose of this value (and code using it) is to avoid creating
......@@ -672,50 +673,51 @@ class BTreeDatabaseManager(DatabaseManager):
batchDelete(self._obj, obj_callback, recycle_subtrees=True)
def checkTIDRange(self, min_tid, max_tid, length, num_partitions, partition):
# XXX: XOR is a lame checksum
count = 0
tid_checksum = 0
tid = 0
upper_bound = util.u64(max_tid)
max_tid = 0
for tid in safeIter(self._trans.keys, min=util.u64(min_tid),
max=upper_bound):
if tid % num_partitions == partition:
if count >= length:
break
max_tid = tid
tid_checksum ^= tid
count += 1
return count, tid_checksum, util.p64(max_tid)
if length:
tid_list = []
for tid in safeIter(self._trans.keys, min=util.u64(min_tid),
max=util.u64(max_tid)):
if tid % num_partitions == partition:
tid_list.append(tid)
if len(tid_list) >= length:
break
if tid_list:
return (len(tid_list),
md5(','.join(map(str, tid_list))).digest(),
util.p64(tid_list[-1]))
return 0, None, ZERO_TID
def checkSerialRange(self, min_oid, min_serial, max_tid, length,
num_partitions, partition):
# XXX: XOR is a lame checksum
u64 = util.u64
p64 = util.p64
min_oid = u64(min_oid)
count = 0
oid_checksum = serial_checksum = 0
max_oid = oid = max_serial = serial = 0
for oid, tserial in safeIter(self._obj.items, min=min_oid):
if oid % num_partitions == partition:
if oid == min_oid:
if length:
u64 = util.u64
min_oid = u64(min_oid)
max_tid = u64(max_tid)
oid_list = []
serial_list = []
for oid, tserial in safeIter(self._obj.items, min=min_oid):
if oid % num_partitions == partition:
try:
serial_iter = tserial.keys(min=u64(min_serial),
max=u64(max_tid))
if oid == min_oid:
tserial = tserial.keys(min=u64(min_serial),
max=max_tid)
else:
tserial = tserial.keys(max=max_tid)
except ValueError:
continue
else:
serial_iter = tserial.keys()
for serial in serial_iter:
if count >= length:
break
oid_checksum ^= oid
serial_checksum ^= serial
max_serial = serial
max_oid = oid
count += 1
if count >= length:
for serial in tserial:
oid_list.append(oid)
serial_list.append(serial)
if len(oid_list) >= length:
break
else:
continue
break
return count, oid_checksum, p64(max_oid), serial_checksum, p64(max_serial)
if oid_list:
p64 = util.p64
return (len(oid_list),
md5(','.join(map(str, oid_list))).digest(),
p64(oid_list[-1]),
md5(','.join(map(str, serial_list))).digest(),
p64(serial_list[-1]))
return 0, None, ZERO_OID, None, ZERO_TID
......@@ -15,6 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from binascii import a2b_hex
import MySQLdb
from MySQLdb import OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
......@@ -811,9 +812,8 @@ class MySQLDatabaseManager(DatabaseManager):
self.commit()
def checkTIDRange(self, min_tid, max_tid, length, num_partitions, partition):
# XXX: XOR is a lame checksum
count, tid_checksum, max_tid = self.query('SELECT COUNT(*), '
'BIT_XOR(tid), MAX(tid) FROM ('
'MD5(GROUP_CONCAT(tid SEPARATOR ",")), MAX(tid) FROM ('
'SELECT tid FROM trans '
'WHERE partition = %(partition)s '
'AND tid >= %(min_tid)d '
......@@ -826,39 +826,37 @@ class MySQLDatabaseManager(DatabaseManager):
'length': length,
})[0]
if count == 0:
tid_checksum = 0
max_tid = ZERO_TID
else:
tid_checksum = a2b_hex(tid_checksum)
max_tid = util.p64(max_tid)
return count, tid_checksum, max_tid
def checkSerialRange(self, min_oid, min_serial, max_tid, length,
num_partitions, partition):
# XXX: XOR is a lame checksum
u64 = util.u64
p64 = util.p64
r = self.query('SELECT oid, serial FROM obj_short WHERE '
'partition = %(partition)s AND '
'serial <= %(max_tid)d AND '
'(oid > %(min_oid)d OR '
'(oid = %(min_oid)d AND serial >= %(min_serial)d)) '
'ORDER BY oid ASC, serial ASC LIMIT %(length)d' % {
count, oid_checksum, max_oid, serial_checksum, max_serial = self.query(
"""SELECT COUNT(*), MD5(GROUP_CONCAT(oid SEPARATOR ",")), MAX(oid),
MD5(GROUP_CONCAT(serial SEPARATOR ",")), MAX(serial)
FROM obj_short
WHERE partition = %(partition)s
AND serial <= %(max_tid)d
AND (oid > %(min_oid)d OR
oid = %(min_oid)d AND serial >= %(min_serial)d)
ORDER BY oid ASC, serial ASC LIMIT %(length)d""" % {
'min_oid': u64(min_oid),
'min_serial': u64(min_serial),
'max_tid': u64(max_tid),
'length': length,
'partition': partition,
})
count = len(r)
oid_checksum = serial_checksum = 0
if count == 0:
})[0]
if count:
oid_checksum = a2b_hex(oid_checksum)
serial_checksum = a2b_hex(serial_checksum)
max_oid = util.p64(max_oid)
max_serial = util.p64(max_serial)
else:
max_oid = ZERO_OID
max_serial = ZERO_TID
else:
for max_oid, max_serial in r:
oid_checksum ^= max_oid
serial_checksum ^= max_serial
max_oid = p64(max_oid)
max_serial = p64(max_serial)
return count, oid_checksum, max_oid, serial_checksum, max_serial
......@@ -149,7 +149,7 @@ class StorageStorageHandlerTests(NeoUnitTestBase):
def test_askCheckTIDRange(self):
count = 1
tid_checksum = 2
tid_checksum = self.getNewUUID()
min_tid = self.getNextTID()
num_partitions = 4
length = 5
......@@ -173,12 +173,12 @@ class StorageStorageHandlerTests(NeoUnitTestBase):
def test_askCheckSerialRange(self):
count = 1
oid_checksum = 2
oid_checksum = self.getNewUUID()
min_oid = self.getOID(1)
num_partitions = 4
length = 5
partition = 6
serial_checksum = 7
serial_checksum = self.getNewUUID()
min_serial = self.getNextTID()
max_serial = self.getNextTID()
max_oid = self.getOID(2)
......
......@@ -686,7 +686,7 @@ class ProtocolTests(NeoUnitTestBase):
min_tid = self.getNextTID()
length = 2
count = 1
tid_checksum = 42
tid_checksum = self.getNewUUID()
max_tid = self.getNextTID()
p = Packets.AnswerCheckTIDRange(min_tid, length, count, tid_checksum,
max_tid)
......@@ -717,9 +717,9 @@ class ProtocolTests(NeoUnitTestBase):
min_serial = self.getNextTID()
length = 2
count = 1
oid_checksum = 24
oid_checksum = self.getNewUUID()
max_oid = self.getOID(5)
tid_checksum = 42
tid_checksum = self.getNewUUID()
max_serial = self.getNextTID()
p = Packets.AnswerCheckSerialRange(min_oid, min_serial, length, count,
oid_checksum, max_oid, tid_checksum, max_serial)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment