Commit 25cbbf97 authored by Julien Muchembled's avatar Julien Muchembled

Optimize invalidations

- do not notify about deleted oids, like ZEO
- exchange lists of partitions instead of oids when possible
parent 574b64df
......@@ -523,6 +523,9 @@ class Application(ThreadedApplication):
compressed_data = ''
compression = 0
checksum = ZERO_HASH
if data_serial is None:
assert oid not in txn_context.resolved_dict, oid
txn_context.delete_list.append(oid)
else:
size, compression, compressed_data = self.compress(data)
checksum = makeChecksum(compressed_data)
......@@ -573,6 +576,7 @@ class Application(ThreadedApplication):
'Conflict resolution succeeded for %s@%s with %s',
dump(oid), dump(old_serial), dump(serial))
# Mark this conflict as resolved
assert oid not in txn_context.delete_list, oid
resolved_dict[oid] = serial
# Try to store again
self._store(txn_context, oid, serial, data)
......@@ -725,13 +729,22 @@ class Application(ThreadedApplication):
self.tpc_vote(transaction)
txn_context = txn_container.pop(transaction)
cache_dict = txn_context.cache_dict
checked_list = [oid for oid, data in cache_dict.iteritems()
if data is CHECKED_SERIAL]
for oid in checked_list:
getPartition = self.pt.getPartition
checked = set()
for oid, data in cache_dict.items():
if data is CHECKED_SERIAL:
del cache_dict[oid]
checked.add(getPartition(oid))
deleted = txn_context.delete_list
if deleted:
oids = set(cache_dict)
oids.difference_update(deleted)
deleted = map(getPartition, deleted)
else:
oids = list(cache_dict)
ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, list(cache_dict),
checked_list, txn_context.pack)
p = Packets.AskFinishTransaction(ttid, oids, deleted, checked,
txn_context.pack)
try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid
......
......@@ -38,6 +38,7 @@ class Transaction(object):
self.txn = txn
# data being stored
self.data_dict = {} # {oid: (value, serial, [node_id])}
self.delete_list = [] # [oid]
# data stored: this will go to the cache on tpc_finish
self.cache_dict = {} # {oid: value}
# conflicts to resolve
......
......@@ -497,7 +497,14 @@ class Packets(dict):
InvalidateObjects = notify("""
Notify about a new transaction modifying objects,
invalidating client caches.
invalidating client caches. Deleted objects are excluded.
:nodes: M -> C
""")
InvalidatePartitions = notify("""
Notify about a new transaction, listing partitions
with modified or deleted objects.
:nodes: M -> C
""")
......
......@@ -563,13 +563,17 @@ class Application(BaseApplication):
tid = txn.tid
transaction_node = txn.node
invalidate_objects = Packets.InvalidateObjects(tid, txn.oid_list)
invalidate_partitions = Packets.InvalidatePartitions(
tid, txn.partition_list)
client_list = self.nm.getClientList(only_identified=True)
for client_node in client_list:
if client_node is transaction_node:
client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.msg_id)
else:
client_node.send(invalidate_objects)
client_node.send(invalidate_partitions
if client_node.extra.get('backup') else
invalidate_objects)
# Unlock Information to relevant storage nodes.
notify_unlock = Packets.NotifyUnlockInformation(ttid)
......
......@@ -63,13 +63,12 @@ class BackupHandler(EventHandler):
raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False
def invalidateObjects(self, conn, tid, oid_list):
def invalidatePartitions(self, conn, tid, partition_list):
app = self.app
if app.ignore_invalidations:
return
getPartition = app.app.pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.add(getPartition(tid))
partition_set = set(partition_list)
partition_set.add(app.app.pt.getPartition(tid))
prev_tid = app.app.getLastTransaction()
app.invalidatePartitions(tid, prev_tid, partition_set)
......
......@@ -64,7 +64,8 @@ class ClientServiceHandler(MasterHandler):
conn.answer((Errors.Ack if app.tm.vote(app, *args) else
Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list, pack):
def askFinishTransaction(self, conn, ttid, oid_list,
deleted, checked, pack):
app = self.app
if pack:
tid = pack[1]
......@@ -74,7 +75,8 @@ class ClientServiceHandler(MasterHandler):
app,
ttid,
oid_list,
checked_list,
deleted,
checked,
conn.getPeerId(),
)
if tid:
......
......@@ -44,7 +44,7 @@ class Transaction(object):
self._notification_set = set()
def __repr__(self):
return "<%s(client=%r, tid=%r, oids=%r, storages=%r, age=%.2fs) at %x>" % (
return "<%s(client=%r, tid=%r, invalidated=%r, storages=%r, age=%.2fs) at %x>" % (
self.__class__.__name__,
self.node,
dump(self.tid),
......@@ -67,9 +67,10 @@ class Transaction(object):
"""
return list(self._notification_set)
def prepare(self, tid, oid_list, involved, msg_id):
def prepare(self, tid, oid_list, partition_list, involved, msg_id):
self.tid = tid
self.oid_list = oid_list
self.partition_list = partition_list
self.msg_id = msg_id
self.involved = involved
self.locking = involved.copy()
......@@ -270,7 +271,7 @@ class TransactionManager(EventQueue):
txn.failed = failed
return True
def prepare(self, app, ttid, oid_list, checked_list, msg_id):
def prepare(self, app, ttid, oid_list, deleted, checked, msg_id):
"""
Prepare a transaction to be finished
"""
......@@ -283,8 +284,10 @@ class TransactionManager(EventQueue):
ready = app.getStorageReadySet(txn.storage_readiness)
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
partition_set.update(deleted)
partition_list = list(partition_set)
partition_set.add(getPartition(ttid))
partition_set.update(checked)
node_list = []
involved = set()
for partition in partition_set:
......@@ -316,7 +319,7 @@ class TransactionManager(EventQueue):
self._queue.append(ttid)
logging.debug('Finish TXN %s for %s (was %s)',
dump(tid), txn.node, dump(ttid))
txn.prepare(tid, oid_list, involved, msg_id)
txn.prepare(tid, oid_list, partition_list, involved, msg_id)
# check if greater and foreign OID was stored
if oid_list:
self.setLastOID(max(oid_list))
......
......@@ -46,7 +46,7 @@ AskClusterState()
AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]})
AskFetchTransactions(int,int,p64,p64,[p64],bool)
AskFinalTID(p64)
AskFinishTransaction(p64,[p64],[p64],?(?[p64],p64))
AskFinishTransaction(p64,[p64],[int],[int],?(?[p64],p64))
AskLastIDs()
AskLastTransaction()
AskLockInformation(p64,p64,bool)
......@@ -76,6 +76,7 @@ CheckReplicas({int:?int},p64,?)
Error(int,bin)
FailedVote(p64,[int])
InvalidateObjects(p64,[p64])
InvalidatePartitions(p64,[int])
NotPrimaryMaster(?int,[(bin,int)])
NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64)
......
......@@ -121,28 +121,6 @@ class Test(NEOThreadedTest):
self.assertFalse(cluster.storage.sqlCount('bigdata'))
self.assertFalse(cluster.storage.sqlCount('data'))
@with_cluster()
def testDeleteObject(self, cluster):
if 1:
storage = cluster.getZODBStorage()
for clear_cache in 0, 1:
for tst in 'a.', 'bcd.':
oid = storage.new_oid()
serial = None
for data in tst:
txn = transaction.Transaction()
storage.tpc_begin(txn)
if data == '.':
storage.deleteObject(oid, serial, txn)
else:
storage.store(oid, serial, data, '', txn)
storage.tpc_vote(txn)
serial = storage.tpc_finish(txn)
if clear_cache:
storage._cache.clear()
self.assertRaises(POSException.POSKeyError,
storage.load, oid, '')
@with_cluster(storage_count=3, replicas=1, partitions=5)
def testIterOIDs(self, cluster):
storage = cluster.getZODBStorage()
......
......@@ -266,7 +266,7 @@ class ReplicationTests(NEOThreadedTest):
def testBackupUpstreamStorageDead(self, backup):
upstream = backup.upstream
with ConnectionFilter() as f:
f.delayInvalidateObjects()
f.delayInvalidatePartitions()
upstream.importZODB()(1)
count = [0]
def _connect(orig, conn):
......@@ -1231,5 +1231,29 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(1, self.checkBackup(backup))
@backup_test(3)
def testDeleteObject(self, backup):
upstream = backup.upstream
storage = upstream.getZODBStorage()
for clear_cache in 0, 1:
for tst in 'a.', 'bcd.':
oid = storage.new_oid()
serial = None
for data in tst:
txn = transaction.Transaction()
storage.tpc_begin(txn)
if data == '.':
storage.deleteObject(oid, serial, txn)
else:
storage.store(oid, serial, data, '', txn)
storage.tpc_vote(txn)
serial = storage.tpc_finish(txn)
self.tic()
self.assertEqual(3, self.checkBackup(backup))
if clear_cache:
storage._cache.clear()
self.assertRaises(POSKeyError, storage.load, oid, '')
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment