Commit d3cb8888 authored by Julien Muchembled's avatar Julien Muchembled

qa: rewrite testReplicationBlockedByUnfinished as a threaded test

It is extended to check that the storage is only notified about the
transactions that existed at the time it asked for them. Otherwise,
Replicator.transactionFinished would be called more than once, and
`self.ttid_set.remove(ttid)` would raise KeyError.

The functional version also contained an annoying 'sleep(10)'.
parent 5b66a6a7
......@@ -54,8 +54,8 @@ class StorageServiceHandler(BaseServiceHandler):
pending_list = ()
else:
# This can't be app.tm.getLastTID() for imported transactions,
# because outdated cells must at least wait that they're locked.
# For normal transactions, it would not matter.
# because outdated cells must at least wait that they're locked
# at source side. For normal transactions, it would not matter.
last_tid = app.getLastTransaction()
pending_list = app.tm.registerForNotification(conn.getUUID())
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
......
......@@ -14,14 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import unittest
import transaction
from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest
from neo.lib.protocol import ClusterStates, NodeStates
from ZODB.tests.StorageTestBase import zodb_pickle
class PObject(Persistent):
......@@ -421,47 +419,5 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0)
def testReplicationBlockedByUnfinished(self):
# start a cluster with 1 of 2 storages and a replica
(started, stopped) = self.__setup(storage_number=2, replicas=1,
pending_number=1, partitions=10)
self.neo.expectRunning(started[0])
self.neo.expectStorageNotKnown(stopped[0])
self.neo.expectOudatedCells(number=0)
self.neo.expectClusterRunning()
self.__populate()
self.neo.expectOudatedCells(number=0)
# start a transaction that will block the end of the replication
db, conn = self.neo.getZODBConnection()
st = conn._storage
t = transaction.Transaction()
t.user = 'user'
t.description = 'desc'
oid = st.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject(42))
st.tpc_begin(t)
st.store(oid, rev, data, '', t)
# start the outdated storage
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.neoctl.tweakPartitionTable()
self.neo.expectRunning(stopped[0])
self.neo.expectClusterRunning()
self.neo.expectAssignedCells(started[0], 10)
self.neo.expectAssignedCells(stopped[0], 10)
# wait a bit, replication must not happen. This hack is required
# because we cannot gather informations directly from the storages
time.sleep(10)
self.neo.expectOudatedCells(number=10)
# finish the transaction, the replication must happen and finish
st.tpc_vote(t)
st.tpc_finish(t)
self.neo.expectOudatedCells(number=0, timeout=10)
if __name__ == "__main__":
unittest.main()
......@@ -494,6 +494,40 @@ class ReplicationTests(NEOThreadedTest):
finally:
cluster.stop()
def testReplicationBlockedByUnfinished(self):
cluster = NEOCluster(replicas=1)
try:
s0, s1 = cluster.storage_list
cluster.start(storage_list=(s0,))
storage = cluster.getZODBStorage()
oid = storage.new_oid()
tid = None
for n in 1, 0:
# On first iteration, the transaction will block replication
# until tpc_finish.
# We do a second iteration as a quick check that the cluster
# remains functional after such a scenario.
txn = transaction.Transaction()
storage.tpc_begin(txn)
tid = storage.store(oid, tid, 'foo', '', txn)
if n:
# Start the outdated storage.
s1.start()
self.tic()
cluster.enableStorageList((s1,))
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertEqual(n, len(cluster.getOutdatedCells()))
storage.tpc_vote(txn)
self.assertEqual(n, len(cluster.getOutdatedCells()))
tid = storage.tpc_finish(txn)
self.tic() # replication resumes and ends
self.assertFalse(cluster.getOutdatedCells())
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testCheckReplicas(self):
from neo.storage import checker
def corrupt(offset):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment