Commit edd10b60 authored by Vincent Pelletier's avatar Vincent Pelletier

No TID must be available to backup tools until bootstrap is over.

 - Do not update _storage during bootstrap
 - Make bootstrap process check _transcient (requires a new accessor)
 - Add a test (requires new protocol command to check bootstrap status)


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24541 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 69e95939
...@@ -179,6 +179,9 @@ class TIDServer(SocketServer.BaseRequestHandler): ...@@ -179,6 +179,9 @@ class TIDServer(SocketServer.BaseRequestHandler):
tid_dict = self._field_exchange.recv_dict() tid_dict = self._field_exchange.recv_dict()
self._tid_storage.commit(identifier, tid_dict) self._tid_storage.commit(identifier, tid_dict)
def bootstraped(self):
self._field_exchange.send_int(has_bootstraped and 1 or 0)
def handle(self): def handle(self):
global tid_storage global tid_storage
self._tid_storage = tid_storage self._tid_storage = tid_storage
...@@ -187,7 +190,8 @@ class TIDServer(SocketServer.BaseRequestHandler): ...@@ -187,7 +190,8 @@ class TIDServer(SocketServer.BaseRequestHandler):
'begin': self.begin, 'begin': self.begin,
'abort': self.abort, 'abort': self.abort,
'commit': self.commit, 'commit': self.commit,
'dump': self.dump 'dump': self.dump,
'bootstraped': self.bootstraped,
} }
self.log('Connected') self.log('Connected')
try: try:
...@@ -327,38 +331,36 @@ class TIDStorage: ...@@ -327,38 +331,36 @@ class TIDStorage:
storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id] storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id]
# Raises if not found # Raises if not found
storage_id_set.remove(storage_id) storage_id_set.remove(storage_id)
if self._tid_file is not None: if has_bootstraped:
now = time.time() if self._tid_file is not None:
can_full_dump = has_bootstraped and (self._next_full_dump is not None) and (self._next_full_dump < now) now = time.time()
can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now) can_full_dump = (self._next_full_dump is not None) and (self._next_full_dump < now)
record_for_dump = can_dump or (self._next_dump is not None) can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
append_to_file = has_bootstraped and (can_dump or can_full_dump) record_for_dump = can_dump or (self._next_dump is not None)
else: append_to_file = (can_dump or can_full_dump)
append_to_file = record_for_dump = can_dump = can_full_dump = False
for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
if len(value) == 0 and key in self._transcient:
self._storage[key] = self._transcient.pop(key)
if record_for_dump:
self._since_last_burst.add(key)
if append_to_file:
if can_full_dump:
to_dump_dict = self._storage
dump_code = 'f'
else: else:
to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst]) append_to_file = record_for_dump = can_dump = can_full_dump = False
dump_code = 'd' for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
if len(to_dump_dict): if len(value) == 0 and key in self._transcient:
self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict)) self._storage[key] = self._transcient.pop(key)
if record_for_dump:
self._since_last_burst.add(key)
if append_to_file:
if can_full_dump: if can_full_dump:
self._next_full_dump = now + self._full_dump_period to_dump_dict = self._storage
if self._next_dump is not None: dump_code = 'f'
self._next_dump = now + self._burst_period else:
self._since_last_burst.clear() to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
if not has_bootstraped: dump_code = 'd'
if len(to_dump_dict):
self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
if can_full_dump:
self._next_full_dump = now + self._full_dump_period
if self._next_dump is not None:
self._next_dump = now + self._burst_period
self._since_last_burst.clear()
else:
doBootstrap() doBootstrap()
#if len(self._storage_id_to_transaction_id_list_dict) == 0:
# self._storage.update(self._transcient)
# self._transcient.clear()
finally: finally:
self._storage_id_lock.release() self._storage_id_lock.release()
...@@ -369,6 +371,13 @@ class TIDStorage: ...@@ -369,6 +371,13 @@ class TIDStorage:
finally: finally:
self._storage_id_lock.release() self._storage_id_lock.release()
def dump_transcient(self):
self._storage_id_lock.acquire()
try:
return self._transcient.copy()
finally:
self._storage_id_lock.release()
def begin(self, transaction_id, storage_id_list): def begin(self, transaction_id, storage_id_list):
self._storage_id_lock.acquire() self._storage_id_lock.acquire()
try: try:
...@@ -433,12 +442,12 @@ class BootstrapContent(threading.Thread): ...@@ -433,12 +442,12 @@ class BootstrapContent(threading.Thread):
else: else:
storage_id_to_object_path_dict[key] = mountpoint storage_id_to_object_path_dict[key] = mountpoint
target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys()) target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys())
known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys()) known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys())
to_check_storage_id_set = target_storage_id_set - known_storage_id_set to_check_storage_id_set = target_storage_id_set - known_storage_id_set
while len(to_check_storage_id_set) and can_bootstrap: while len(to_check_storage_id_set) and can_bootstrap:
serialize_url = None serialize_url = None
for storage_id in to_check_storage_id_set: for storage_id in to_check_storage_id_set:
if can_bootstrap and storage_id not in tid_storage.dump().keys(): if can_bootstrap and storage_id not in tid_storage.dump_transcient().keys():
serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], ) serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], )
try: try:
# Query a Zope, which will contact this process in return to store # Query a Zope, which will contact this process in return to store
...@@ -450,7 +459,7 @@ class BootstrapContent(threading.Thread): ...@@ -450,7 +459,7 @@ class BootstrapContent(threading.Thread):
log('Opened %r: %r' % (serialize_url, page.read())) log('Opened %r: %r' % (serialize_url, page.read()))
# Let some time for zope to contact TIDStorage back and fill the gaps. # Let some time for zope to contact TIDStorage back and fill the gaps.
time.sleep(5) time.sleep(5)
known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys()) known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys())
to_check_storage_id_set = target_storage_id_set - known_storage_id_set to_check_storage_id_set = target_storage_id_set - known_storage_id_set
if len(to_check_storage_id_set): if len(to_check_storage_id_set):
log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, )) log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, ))
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
from ExchangeProtocol import ExchangeProtocol from ExchangeProtocol import ExchangeProtocol
import traceback import traceback
import socket import socket
import time
import sys import sys
assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>' assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
...@@ -53,6 +54,14 @@ class TIDClient: ...@@ -53,6 +54,14 @@ class TIDClient:
internal_storage_tid_dict['%s_%s' % (test_id, key)] = value internal_storage_tid_dict['%s_%s' % (test_id, key)] = value
self._exchange_protocol.send_dict(internal_storage_tid_dict) self._exchange_protocol.send_dict(internal_storage_tid_dict)
def bootstraped(self):
self._exchange_protocol.send_field('bootstraped')
return self._exchange_protocol.recv_int()
def waitForBootstrap(self):
while not self.bootstraped():
time.sleep(0.1)
class TestTIDServerV2: class TestTIDServerV2:
def __init__(self, address, port): def __init__(self, address, port):
self._client = TIDClient((address, port)) self._client = TIDClient((address, port))
...@@ -60,13 +69,35 @@ class TestTIDServerV2: ...@@ -60,13 +69,35 @@ class TestTIDServerV2:
def assertEqual(self, value, target): def assertEqual(self, value, target):
assert value == target, 'value %r does not match target %r' % (value, target) assert value == target, 'value %r does not match target %r' % (value, target)
def testInitialValue(self, test_id): def test_01_InitialValue(self, test_id):
""" """
Check that the storage is empty Check that the storage is empty
""" """
self.assertEqual(self._client.dump_all(), {}) self.assertEqual(self._client.dump_all(), {})
def test_02_Bootstrap(self, test_id):
"""
Trigger bootstrap and check that no value is visible until bootstrap is
done.
"""
t1_storage_tid_dict = {'s0': 1}
t2_storage_tid_dict = {'s1': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
# Bootstrap is runing on the server, nothing is visible yet.
self.assertEqual(self._client.dump(test_id), {})
self._client.waitForBootstrap()
# Nothing is available yet, we need one more transaction to happen.
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
# Now everything must be available.
self.assertEqual(self._client.dump(test_id), {'s0': 1, 's1': 1})
def testScenario1(self, test_id): def test_03_Scenario1(self, test_id):
""" """
Simple begin - commit case. Simple begin - commit case.
""" """
...@@ -77,7 +108,7 @@ class TestTIDServerV2: ...@@ -77,7 +108,7 @@ class TestTIDServerV2:
self._client.commit(test_id, 't1', storage_tid_dict) self._client.commit(test_id, 't1', storage_tid_dict)
self.assertEqual(self._client.dump(test_id), storage_tid_dict) self.assertEqual(self._client.dump(test_id), storage_tid_dict)
def testScenario2(self, test_id): def test_04_Scenario2(self, test_id):
""" """
Simple begin - abort case. Simple begin - abort case.
""" """
...@@ -88,7 +119,7 @@ class TestTIDServerV2: ...@@ -88,7 +119,7 @@ class TestTIDServerV2:
self._client.abort(test_id, 't1') self._client.abort(test_id, 't1')
self.assertEqual(self._client.dump(test_id), {}) self.assertEqual(self._client.dump(test_id), {})
def testScenario3(self, test_id): def test_05_Scenario3(self, test_id):
""" """
2 concurent transactions impacting a common storage. 2 concurent transactions impacting a common storage.
Second transaction begins after first does, and commits before Second transaction begins after first does, and commits before
...@@ -106,7 +137,7 @@ class TestTIDServerV2: ...@@ -106,7 +137,7 @@ class TestTIDServerV2:
self._client.commit(test_id, 't1', t1_storage_tid_dict) self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1}) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1})
def testScenario4(self, test_id): def test_06_Scenario4(self, test_id):
""" """
3 concurent transactions. 3 concurent transactions.
Transactions 1 and 2 impact same storage s1. Transactions 1 and 2 impact same storage s1.
...@@ -132,7 +163,7 @@ class TestTIDServerV2: ...@@ -132,7 +163,7 @@ class TestTIDServerV2:
self._client.commit(test_id, 't1', t1_storage_tid_dict) self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1}) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1})
def testScenario4bis(self, test_id): def test_07_Scenario4bis(self, test_id):
""" """
3 concurent transactions. 3 concurent transactions.
Transactions 1 and 2 impact same storage s1. Transactions 1 and 2 impact same storage s1.
...@@ -162,7 +193,7 @@ class TestTIDServerV2: ...@@ -162,7 +193,7 @@ class TestTIDServerV2:
self._client.abort(test_id, 't1') self._client.abort(test_id, 't1')
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1}) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1})
def testScenario5(self, test_id): def test_08_Scenario5(self, test_id):
""" """
2 concurent transactions impacting a common storage. 2 concurent transactions impacting a common storage.
Second transaction begins after first does, and commits after Second transaction begins after first does, and commits after
...@@ -180,7 +211,7 @@ class TestTIDServerV2: ...@@ -180,7 +211,7 @@ class TestTIDServerV2:
self._client.commit(test_id, 't2', t2_storage_tid_dict) self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1}) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1})
def testScenario6(self, test_id): def test_09_Scenario6(self, test_id):
""" """
2 concurent transactions impacting separate sets of storages. 2 concurent transactions impacting separate sets of storages.
Check that the first commit impacts dump data immediately. Check that the first commit impacts dump data immediately.
...@@ -197,7 +228,7 @@ class TestTIDServerV2: ...@@ -197,7 +228,7 @@ class TestTIDServerV2:
self._client.commit(test_id, 't2', t2_storage_tid_dict) self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1}) self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1})
def testScenario7(self, test_id): def test_10_Scenario7(self, test_id):
""" """
3 concurent transactions. 3 concurent transactions.
t1 and t2 impact a set of different storages. t1 and t2 impact a set of different storages.
...@@ -222,7 +253,7 @@ class TestTIDServerV2: ...@@ -222,7 +253,7 @@ class TestTIDServerV2:
self._client.commit(test_id, 't3', t3_storage_tid_dict) self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2}) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2})
def testScenario8(self, test_id): def test_11_Scenario8(self, test_id):
""" """
Simple increase case. Simple increase case.
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment