Commit 9682722b by Julien Muchembled

Do not send invalidations for objects on which only readCurrent was called

This fixes invalid next_serial entries in cache,
and the following error for values not in cache:

  Traceback (most recent call last):
    File "ZODB/Connection.py", line 856, in setstate
      self._setstate(obj)
    File "ZODB/Connection.py", line 894, in _setstate
      self._load_before_or_conflict(obj)
    File "ZODB/Connection.py", line 922, in _load_before_or_conflict
      if not self._setstate_noncurrent(obj):
    File "ZODB/Connection.py", line 945, in _setstate_noncurrent
      assert end is not None
  AssertionError
parent b3522b1b
......@@ -45,7 +45,7 @@ from .pool import ConnectionPool
from neo.lib.util import p64, u64, parseMasterList
from neo.lib.debug import register as registerLiveDebugger
CHECKED_SERIAL = master.CHECKED_SERIAL
CHECKED_SERIAL = object()
try:
from Signals.Signals import SignalHandler
......@@ -664,13 +664,18 @@ class Application(ThreadedApplication):
txn_container = self._txn_container
if 'voted' not in txn_container.get(transaction):
self.tpc_vote(transaction, tryToResolveConflict)
checked_list = []
self._load_lock_acquire()
try:
# Call finish on master
txn_context = txn_container.pop(transaction)
cache_dict = txn_context['cache_dict']
checked_list = [oid for oid, data in cache_dict.iteritems()
if data is CHECKED_SERIAL]
for oid in checked_list:
del cache_dict[oid]
tid = self._askPrimary(Packets.AskFinishTransaction(
txn_context['ttid'], cache_dict),
txn_context['ttid'], cache_dict, checked_list),
cache_dict=cache_dict, callback=f)
assert tid
return tid
......
......@@ -22,7 +22,6 @@ from neo.lib.util import dump, add64
from . import AnswerBaseHandler
from ..exception import NEOStorageError
CHECKED_SERIAL = object()
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
......@@ -120,11 +119,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
app._cache_lock_acquire()
try:
for oid, data in kw.pop('cache_dict').iteritems():
if data is CHECKED_SERIAL:
# this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data
# was modified).
continue
# Update ex-latest value in cache
cache.invalidate(oid, tid)
if data is not None:
......
......@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO
from struct import Struct
PROTOCOL_VERSION = 3
PROTOCOL_VERSION = 4
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -850,6 +850,9 @@ class FinishTransaction(Packet):
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
PList('checked_list',
POID('oid'),
),
)
_answer = PStruct('answer_information_locked',
......
......@@ -54,12 +54,13 @@ class ClientServiceHandler(MasterHandler):
def askNewOIDs(self, conn, num_oids):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
def askFinishTransaction(self, conn, ttid, oid_list):
def askFinishTransaction(self, conn, ttid, oid_list, checked_list):
app = self.app
pt = app.pt
# Collect partitions related to this transaction.
partition_set = set(map(pt.getPartition, oid_list))
lock_oid_list = oid_list + checked_list
partition_set = set(map(pt.getPartition, lock_oid_list))
partition_set.add(pt.getPartition(ttid))
# Collect the UUIDs of nodes related to this transaction.
......@@ -84,7 +85,7 @@ class ClientServiceHandler(MasterHandler):
{x.getUUID() for x in identified_node_list},
conn.getPeerId(),
),
oid_list,
lock_oid_list,
)
for node in identified_node_list:
node.ask(p, timeout=60)
......
......@@ -124,18 +124,17 @@ class MasterClientHandlerTests(NeoUnitTestBase):
})
ttid = self.getNextTID()
service.askBeginTransaction(conn, ttid)
oid_list = []
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
# No packet sent if storage node is not ready
self.assertFalse(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, oid_list)
service.askFinishTransaction(conn, ttid, (), ())
self.checkNoPacketSent(storage_conn)
self.app.tm.abortFor(self.app.nm.getByUUID(client_uuid))
# ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, oid_list)
service.askFinishTransaction(conn, ttid, (), ())
self.checkAskLockInformation(storage_conn)
self.assertEqual(len(self.app.tm.registerForNotification(storage_uuid)), 1)
txn = self.app.tm[ttid]
......
......@@ -736,7 +736,8 @@ class NEOCluster(object):
if cell[1] == CellStates.OUT_OF_DATE]
def getZODBStorage(self, **kw):
return Storage.Storage(None, self.name, _app=self.client, **kw)
kw['_app'] = kw.pop('client', self.client)
return Storage.Storage(None, self.name, **kw)
def importZODB(self, dummy_zodb=None, random=random):
if dummy_zodb is None:
......
......@@ -22,7 +22,7 @@ import unittest
from thread import get_ident
from zlib import compress
from persistent import Persistent
from ZODB import POSException
from ZODB import DB, POSException
from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError
from neo.lib.connection import ConnectionClosed, MTClientConnection
......@@ -30,7 +30,7 @@ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_TID
from .. import expectedFailure, _UnexpectedSuccess, Patch
from . import NEOCluster, NEOThreadedTest
from neo.lib.util import add64, makeChecksum
from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
......@@ -690,6 +690,48 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testReadVerifyingStorage(self):
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounter()
t1.commit()
t1.begin()
x._p_deactivate()
# We need a second client for external invalidations.
t2 = transaction.TransactionManager()
db = DB(storage=cluster.getZODBStorage(client=cluster.newClient()))
try:
c2 = db.open(t2)
t2.begin()
r = c2.root()
r['y'] = None
r['x']._p_activate()
c2.readCurrent(r['x'])
# Force the new tid to be even, like the modified oid and
# unlike the oid on which we used readCurrent. Thus we check
# that the node containing only the partition 1 is also
# involved in tpc_finish.
with Patch(cluster.master.tm, begin=lambda orig, node, tid:
orig(node, p64(u64(x._p_serial) + 2 & ~1))):
t2.commit()
for storage in cluster.storage_list:
self.assertFalse(storage.tm._transaction_dict)
finally:
db.close()
# Clearing cache is the easiest way to check we did't get an
# invalidation, which would cause a failure in _setstate_noncurrent
c1._storage._cache.clear()
self.assertFalse(x.value)
t0, t1, t2 = c1._storage.iterator()
self.assertEqual(map(u64, t0.oid_list), [0])
self.assertEqual(map(u64, t1.oid_list), [0, 1])
# Check oid 1 is part of transaction metadata.
self.assertEqual(t2.oid_list, t1.oid_list)
finally:
cluster.stop()
def testClientReconnection(self):
conn = [None]
def getConnForNode(orig, node):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment