Commit 5d2baac5 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 'master' into t

* master:
  client: fix item eviction from cache, which could break loading from storage
  Bump protocol version for new read-only mode in BACKUPING state
  backup: Teach cluster in BACKUPING state to also serve regular ZODB clients in read-only mode
  tests/threaded: Add handy shortcuts to NEOCluster to concisely check cluster properties in tests
parents 9c8502ed 4ef05b9e
...@@ -102,6 +102,12 @@ class ClientCache(object): ...@@ -102,6 +102,12 @@ class ClientCache(object):
if item is head: if item is head:
break break
def _remove_from_oid_dict(self, item):
item_list = self._oid_dict[item.oid]
item_list.remove(item)
if not item_list:
del self._oid_dict[item.oid]
def _add(self, item): def _add(self, item):
level = item.level level = item.level
try: try:
...@@ -126,10 +132,7 @@ class ClientCache(object): ...@@ -126,10 +132,7 @@ class ClientCache(object):
self._history_size += 1 self._history_size += 1
if self._max_history_size < self._history_size: if self._max_history_size < self._history_size:
self._remove(head) self._remove(head)
item_list = self._oid_dict[head.oid] self._remove_from_oid_dict(head)
item_list.remove(head)
if not item_list:
del self._oid_dict[head.oid]
def _remove(self, item): def _remove(self, item):
level = item.level level = item.level
...@@ -165,7 +168,7 @@ class ClientCache(object): ...@@ -165,7 +168,7 @@ class ClientCache(object):
if head.level or head.counter: if head.level or head.counter:
self._add(head) self._add(head)
else: else:
self._oid_dict[head.oid].remove(head) self._remove_from_oid_dict(head)
break break
def _load(self, oid, before_tid=None): def _load(self, oid, before_tid=None):
...@@ -247,7 +250,7 @@ class ClientCache(object): ...@@ -247,7 +250,7 @@ class ClientCache(object):
head.level = 0 head.level = 0
self._add(head) self._add(head)
else: else:
self._oid_dict[head.oid].remove(head) self._remove_from_oid_dict(head)
if self._size <= max_size: if self._size <= max_size:
return return
head = next head = next
......
...@@ -20,7 +20,7 @@ import traceback ...@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO from cStringIO import StringIO
from struct import Struct from struct import Struct
PROTOCOL_VERSION = 6 PROTOCOL_VERSION = 7
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
......
...@@ -103,7 +103,7 @@ class Application(BaseApplication): ...@@ -103,7 +103,7 @@ class Application(BaseApplication):
self) self)
self.secondary_master_handler = secondary.SecondaryMasterHandler(self) self.secondary_master_handler = secondary.SecondaryMasterHandler(self)
self.client_service_handler = client.ClientServiceHandler(self) self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientROServiceHandler(self) self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self)
self.storage_service_handler = storage.StorageServiceHandler(self) self.storage_service_handler = storage.StorageServiceHandler(self)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
......
...@@ -120,12 +120,11 @@ class ClientServiceHandler(MasterHandler): ...@@ -120,12 +120,11 @@ class ClientServiceHandler(MasterHandler):
# like ClientServiceHandler but read-only & only for tid <= backup_tid # like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientROServiceHandler(ClientServiceHandler): class ClientReadOnlyServiceHandler(ClientServiceHandler):
def _readOnly(self, conn, *args, **kw): def _readOnly(self, conn, *args, **kw):
p = Errors.ReadOnlyAccess('read-only access because cluster is in backuping mode') conn.answer(Errors.ReadOnlyAccess(
conn.answer(p) 'read-only access because cluster is in backuping mode'))
askBeginTransaction = _readOnly askBeginTransaction = _readOnly
askNewOIDs = _readOnly askNewOIDs = _readOnly
......
...@@ -142,12 +142,10 @@ class ClientOperationHandler(EventHandler): ...@@ -142,12 +142,10 @@ class ClientOperationHandler(EventHandler):
else: else:
partition_list = [partition] partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first, partition_list) return app.dm.getTIDList(first, last - first, partition_list)
return tid_list
def askTIDs(self, conn, first, last, partition): def askTIDs(self, conn, *args):
tid_list = self._askTIDs(first, last, partition) conn.answer(Packets.AnswerTIDs(self._askTIDs(*args)))
conn.answer(Packets.AnswerTIDs(tid_list))
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid))) conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid)))
...@@ -228,50 +226,54 @@ class ClientOperationHandler(EventHandler): ...@@ -228,50 +226,54 @@ class ClientOperationHandler(EventHandler):
# like ClientOperationHandler but read-only & only for tid <= backup_tid # like ClientOperationHandler but read-only & only for tid <= backup_tid
class ClientROOperationHandler(ClientOperationHandler): class ClientReadOnlyOperationHandler(ClientOperationHandler):
def _readOnly(self, conn, *args, **kw): def _readOnly(self, conn, *args, **kw):
p = Errors.ReadOnlyAccess('read-only access because cluster is in backuping mode') conn.answer(Errors.ReadOnlyAccess(
conn.answer(p) 'read-only access because cluster is in backuping mode'))
abortTransaction = _readOnly abortTransaction = _readOnly
askStoreTransaction = _readOnly askStoreTransaction = _readOnly
askVoteTransaction = _readOnly askVoteTransaction = _readOnly
askStoreObject = _readOnly askStoreObject = _readOnly
askFinalTID = _readOnly askFinalTID = _readOnly
askCheckCurrentSerial = _readOnly # takes write lock & is only used when going to commit # takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
# below operations: like in ClientOperationHandler but cut tid <= backup_tid # below operations: like in ClientOperationHandler but cut tid <= backup_tid
def askTransactionInformation(self, conn, tid): def askTransactionInformation(self, conn, tid):
backup_tid = self.app.dm.getBackupTID() backup_tid = self.app.dm.getBackupTID()
if tid > backup_tid: if tid > backup_tid:
p = Errors.TidNotFound('tids > %s are not fully fetched yet' % dump(backup_tid)) conn.answer(Errors.TidNotFound(
conn.answer(p) 'tids > %s are not fully fetched yet' % dump(backup_tid)))
return return
super(ClientROOperationHandler, self).askTransactionInformation(conn, tid) super(ClientReadOnlyOperationHandler, self).askTransactionInformation(
conn, tid)
def askObject(self, conn, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
backup_tid = self.app.dm.getBackupTID() backup_tid = self.app.dm.getBackupTID()
if serial and serial > backup_tid: if serial:
# obj lookup will find nothing, but return properly either if serial > backup_tid:
# OidDoesNotExist or OidNotFound # obj lookup will find nothing, but return properly either
serial = ZERO_TID # OidDoesNotExist or OidNotFound
if tid: serial = ZERO_TID
elif tid:
tid = min(tid, add64(backup_tid, 1)) tid = min(tid, add64(backup_tid, 1))
# limit "latest obj" query to tid <= backup_tid # limit "latest obj" query to tid <= backup_tid
if not serial and not tid: else:
tid = add64(backup_tid, 1) tid = add64(backup_tid, 1)
super(ClientROOperationHandler, self).askObject(conn, oid, serial, tid) super(ClientReadOnlyOperationHandler, self).askObject(
conn, oid, serial, tid)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition): def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
backup_tid = self.app.dm.getBackupTID() backup_tid = self.app.dm.getBackupTID()
max_tid = min(max_tid, backup_tid) max_tid = min(max_tid, backup_tid)
# NOTE we don't need to adjust min_tid: if min_tid > max_tid # NOTE we don't need to adjust min_tid: if min_tid > max_tid
# db.getReplicationTIDList will return empty [], which is correct # db.getReplicationTIDList will return empty [], which is correct
super(ClientROOperationHandler, self).askTIDsFrom( super(ClientReadOnlyOperationHandler, self).askTIDsFrom(
conn, min_tid, max_tid, length, partition) conn, min_tid, max_tid, length, partition)
def askTIDs(self, conn, first, last, partition): def askTIDs(self, conn, first, last, partition):
......
...@@ -19,7 +19,7 @@ from neo.lib.handler import EventHandler ...@@ -19,7 +19,7 @@ from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from .storage import StorageOperationHandler from .storage import StorageOperationHandler
from .client import ClientOperationHandler, ClientROOperationHandler from .client import ClientOperationHandler, ClientReadOnlyOperationHandler
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """ """ Handler used for incoming connections during operation state """
...@@ -49,7 +49,7 @@ class IdentificationHandler(EventHandler): ...@@ -49,7 +49,7 @@ class IdentificationHandler(EventHandler):
# choose the handler according to the node type # choose the handler according to the node type
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
if app.dm.getBackupTID(): if app.dm.getBackupTID():
handler = ClientROOperationHandler handler = ClientReadOnlyOperationHandler
else: else:
handler = ClientOperationHandler handler = ClientOperationHandler
if node is None: if node is None:
......
...@@ -691,34 +691,34 @@ class NEOCluster(object): ...@@ -691,34 +691,34 @@ class NEOCluster(object):
return admin return admin
### ###
# A few handy shortcuts for tests # More handy shortcuts for tests
@property @property
def ptid(self): def ptid(self):
return self.neoctl.getRecovery()[0] return self.neoctl.getRecovery()[0]
@property @property
def backup_tid(self): def backup_tid(self):
# TODO -> self.master.pt.getBackupTid() ?
return self.neoctl.getRecovery()[1] return self.neoctl.getRecovery()[1]
@property
def truncated_tid(self):
return self.neoctl.getRecovery()[2]
@property @property
def last_tid(self): def last_tid(self):
return self.primary_master.getLastTransaction() return self.primary_master.getLastTransaction()
@property
def truncated_tid(self):
return self.neoctl.getRecovery()[2]
@property @property
def cluster_state(self): def cluster_state(self):
return self.neoctl.getClusterState() return self.neoctl.getClusterState()
###
@property @property
def primary_master(self): def primary_master(self):
master, = [master for master in self.master_list if master.primary] master, = [master for master in self.master_list if master.primary]
return master return master
###
def reset(self, clear_database=False): def reset(self, clear_database=False):
for node_type in 'master', 'storage', 'admin': for node_type in 'master', 'storage', 'admin':
......
...@@ -37,9 +37,11 @@ from .. import Patch ...@@ -37,9 +37,11 @@ from .. import Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
# dump log to stderr # dump log to stderr
"""
logging.backlog(max_size=None) logging.backlog(max_size=None)
del logging.default_root_handler.handle del logging.default_root_handler.handle
getLogger().setLevel(INFO) getLogger().setLevel(INFO)
"""
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped): def decorator(wrapped):
...@@ -183,7 +185,8 @@ class ReplicationTests(NEOThreadedTest): ...@@ -183,7 +185,8 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.last_tid, upstream.last_tid) # not-yet truncated self.assertEqual(backup.last_tid, upstream.last_tid) # not-yet truncated
self.tic() self.tic()
self.assertEqual(backup.cluster_state, ClusterStates.RUNNING) self.assertEqual(backup.cluster_state, ClusterStates.RUNNING)
self.assertEqual(backup.backup_tid, None) self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
self.assertEqual(backup.last_tid, u_last_tid0) # truncated after recovery self.assertEqual(backup.last_tid, u_last_tid0) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid)) self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid))
finally: finally:
...@@ -213,7 +216,8 @@ class ReplicationTests(NEOThreadedTest): ...@@ -213,7 +216,8 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.last_tid, u_last_tid1) # = B^.backup_tid self.assertEqual(backup.last_tid, u_last_tid1) # = B^.backup_tid
self.tic() self.tic()
self.assertEqual(backup.cluster_state, ClusterStates.RUNNING) self.assertEqual(backup.cluster_state, ClusterStates.RUNNING)
self.assertEqual(backup.backup_tid, None) self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
self.assertEqual(backup.last_tid, u_last_tid1) # truncated after recovery self.assertEqual(backup.last_tid, u_last_tid1) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid)) self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid))
finally: finally:
...@@ -520,7 +524,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -520,7 +524,6 @@ class ReplicationTests(NEOThreadedTest):
checker.CHECK_COUNT = CHECK_COUNT checker.CHECK_COUNT = CHECK_COUNT
cluster.stop() cluster.stop()
@backup_test() @backup_test()
def testBackupReadOnlyAccess(self, backup): def testBackupReadOnlyAccess(self, backup):
"""Check backup cluster can be used in read-only mode by ZODB clients""" """Check backup cluster can be used in read-only mode by ZODB clients"""
...@@ -543,8 +546,9 @@ class ReplicationTests(NEOThreadedTest): ...@@ -543,8 +546,9 @@ class ReplicationTests(NEOThreadedTest):
if i == cutoff: if i == cutoff:
f.add(delayReplication) f.add(delayReplication)
if i == recover: if i == recover:
# removes the filter and retransmits all packets that were # .remove() removes the filter and retransmits all packets
# queued once first filtered packed was detected on a connection. # that were queued once first filtered packed was detected
# on a connection.
f.remove(delayReplication) f.remove(delayReplication)
# commit new data to U # commit new data to U
...@@ -558,7 +562,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -558,7 +562,7 @@ class ReplicationTests(NEOThreadedTest):
oid_list.append(oid) oid_list.append(oid)
tid_list.append(tid) tid_list.append(tid)
# make sure data propagated to B # make sure data propagated to B (depending on cutoff)
self.tic() self.tic()
if cutoff <= i < recover: if cutoff <= i < recover:
self.assertLess(B.backup_tid, U.last_tid) self.assertLess(B.backup_tid, U.last_tid)
...@@ -579,35 +583,38 @@ class ReplicationTests(NEOThreadedTest): ...@@ -579,35 +583,38 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(data, '%s-%s' % (oid, j)) self.assertEqual(data, '%s-%s' % (oid, j))
self.assertEqual(serial, tid_list[j]) self.assertEqual(serial, tid_list[j])
# verify how transaction log & friends behave under potentially not-yet-fully # verify how transaction log & friends behave under potentially
# fetched backup state (transactions committed at [cutoff, recover) should # not-yet-fully fetched backup state (transactions committed at
# not be there; otherwise transactions should be fully there) # [cutoff, recover) should not be there; otherwise transactions
# should be fully there)
Zb = B.getZODBStorage() Zb = B.getZODBStorage()
Btxn_list = list(Zb.iterator()) Btxn_list = list(Zb.iterator())
self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover else i+1) self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
else i+1)
for j, txn in enumerate(Btxn_list): for j, txn in enumerate(Btxn_list):
self.assertEqual(txn.tid, tid_list[j]) self.assertEqual(txn.tid, tid_list[j])
self.assertEqual(txn.description, 'test transaction %i' % j) self.assertEqual(txn.description, 'test transaction %i' % j)
obj_list = list(txn) obj, = txn
self.assertEqual(len(obj_list), 1)
obj = obj_list[0]
self.assertEqual(obj.oid, oid_list[j]) self.assertEqual(obj.oid, oid_list[j])
self.assertEqual(obj.data, '%s-%s' % (obj.oid, j)) self.assertEqual(obj.data, '%s-%s' % (obj.oid, j))
# TODO test askObjectHistory once it is fixed # TODO test askObjectHistory once it is fixed
# try to commit something to backup storage and make sure it is really read-only # try to commit something to backup storage and make sure it is
Zb._cache._max_size = 0 # make stores do work in sync way # really read-only
Zb._cache._max_size = 0 # make store() do work in sync way
txn = transaction.Transaction() txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn) self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid) self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1], tid_list[-1], 'somedata', '', txn) self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1],
# tpc_vote first checks whether there were store replies - thus not ReadOnlyError tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies -
# thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn) self.assertRaises(NEOStorageError, Zb.tpc_vote, txn)
# close storage because client app is otherwise shared in threaded # close storage because client app is otherwise shared in
# tests and we need to refresh last_tid on next run # threaded tests and we need to refresh last_tid on next run
# (see above about invalidations not working) # (XXX see above about invalidations not working)
Zb.close() Zb.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment