Commit 2d5b3dff authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5ad44db5
......@@ -330,6 +330,8 @@ class Application(ThreadedApplication):
# TODO:
# - rename parameters (here? and in handlers & packet definitions)
print 'QQQ client load oid: %r tid: %r before_tid: %r' % (oid, tid, before_tid)
acquire = self._cache_lock_acquire
release = self._cache_lock_release
# XXX: Consider using a more fine-grained lock.
......@@ -350,6 +352,7 @@ class Application(ThreadedApplication):
# Do not get something more recent than the last invalidation
# we got from master.
before_tid = p64(u64(self.last_tid) + 1)
print '\t.last_tid: %r' % self.last_tid
data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
acquire()
try:
......@@ -369,6 +372,7 @@ class Application(ThreadedApplication):
return data, tid, next_tid
def _loadFromStorage(self, oid, at_tid, before_tid):
print 'QQQ2 client loadFromStor oid: %r at_tid: %r before_tid: %r' % (oid, at_tid, before_tid)
packet = Packets.AskObject(oid, at_tid, before_tid)
for node, conn in self.cp.iterateForObject(oid, readable=True):
try:
......
......@@ -522,7 +522,6 @@ class Application(BaseApplication):
tid = txn.getTID()
transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
# NOTE send invalidation to clients
for client_node in self.nm.getClientList(only_identified=True):
c = client_node.getConnection()
if client_node is transaction_node:
......
# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2016 Nexedi SA
#
......@@ -95,7 +96,7 @@ class BackupApplication(object):
bootstrap = BootstrapManager(self, self.name, NodeTypes.CLIENT)
# {offset -> node} (primary storage for off which will be talking to upstream cluster)
self.primary_partition_dict = {}
# [[tid]]
# [[tid]] part -> []tid↑ (currently scheduled-for-sync txns)
self.tid_list = tuple([] for _ in xrange(pt.getPartitions()))
try:
while True:
......@@ -208,13 +209,12 @@ class BackupApplication(object):
except IndexError:
last_max_tid = prev_tid
if offset in partition_set:
self.tid_list[offset].append(tid)
self.tid_list[offset].append(tid) # XXX check tid is ↑
node_list = []
for cell in pt.getCellList(offset, readable=True):
node = cell.getNode()
assert node.isConnected(), node
if cell.backup_tid == prev_tid:
"""
# Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates
......@@ -229,8 +229,6 @@ class BackupApplication(object):
logging.debug(
"partition %u: updating backup_tid of %r to %s",
offset, cell, dump(cell.backup_tid))
"""
pass
else:
assert cell.backup_tid < last_max_tid, (
cell.backup_tid, last_max_tid, prev_tid, tid)
......@@ -301,13 +299,13 @@ class BackupApplication(object):
app = self.app
cell = app.pt.getCell(offset, node.getUUID())
tid_list = self.tid_list[offset]
if tid_list: # may be empty if the cell is out-of-date
if tid_list: # may be empty if the cell is out-of-date # XXX check how
# or if we're not fully initialized
if tid < tid_list[0]:
if tid < tid_list[0]: # XXX why this is possible?
cell.replicating = tid
else:
try:
tid = add64(tid_list[bisect(tid_list, tid)], -1)
tid = add64(tid_list[bisect(tid_list, tid)], -1) # XXX why -1 ?
except IndexError:
last_tid = app.getLastTransaction()
if tid < last_tid:
......@@ -316,9 +314,11 @@ class BackupApplication(object):
logging.debug("partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid))
cell.backup_tid = tid
# TODO provide invalidation feedback about new txns to read-only clients connected to backup cluster
# NOTE ^^^ not only here but also hooked to in-progress feedback from fetchObjects (storage)
# Forget tids we won't need anymore.
cell_list = app.pt.getCellList(offset, readable=True)
del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))]
del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))] # XXX not only for primary Sb ?
primary_node = self.primary_partition_dict.get(offset)
primary = primary_node is node
result = None if primary else app.pt.setUpToDate(node, offset)
......
......@@ -120,7 +120,6 @@ class ClientServiceHandler(MasterHandler):
# like ClientServiceHandler but read-only & only for tid <= backup_tid
# XXX naming -> (?) ClientReadOnlyHandler
class ClientROServiceHandler(ClientServiceHandler):
# XXX somehow make sure to propagate this to raiseReadOnlyError() on client ?
......@@ -139,6 +138,9 @@ class ClientROServiceHandler(ClientServiceHandler):
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
backup_tid = self.app.backup_tid
# XXX wrong vvv
backup_tid = self.app.backup_tid # FIXME this is not regularly updated - only on request from outside ?
# XXX yes -> see askRecovery in master/handlers/
assert backup_tid is not None # in BACKUPING mode it is always set
print '\n\n\nASK LAST_TID -> %r\n' % backup_tid
conn.answer(Packets.AnswerLastTransaction(backup_tid))
......@@ -231,7 +231,6 @@ class ClientOperationHandler(EventHandler):
# like ClientOperationHandler but read-only & only for tid <= backup_tid
# XXX naming -> ClientReadOnlyHandler ?
class ClientROOperationHandler(ClientOperationHandler):
def _readOnly(self, *args, **kw): raise NotReadyError('read-only access')
......@@ -255,8 +254,8 @@ class ClientROOperationHandler(ClientOperationHandler):
super(ClientROOperationHandler, self).askTransactionInformation(conn, tid)
def askObject(self, conn, oid, serial, tid):
print '\n\n\nASK OBJECT %r, %r, %r\n\n\n' % (oid, serial, tid)
backup_tid = self.app.dm.getBackupTID()
print '\n\n\nASK OBJECT %r, %r, %r (backup_tid: %r)\n\n\n' % (oid, serial, tid, backup_tid)
if serial and serial > backup_tid:
# obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound
......
......@@ -105,6 +105,7 @@ class StorageOperationHandler(EventHandler):
self.app.dm.commit()
assert not pack_tid, "TODO"
if next_tid:
# TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
self.app.replicator.finish()
......
......@@ -750,6 +750,7 @@ class NEOCluster(object):
self.enableStorageList(storage_list)
def newClient(self):
print '%r: newClient()' % self
return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL)
......
......@@ -525,8 +525,8 @@ class ReplicationTests(NEOThreadedTest):
"""Check data can be read from backup cluster by clients"""
B = backup
U = B.upstream
S = U.getZODBStorage()
Sb = B.getZODBStorage()
Z = U.getZODBStorage()
#Zb = B.getZODBStorage()
oid_list = []
tid_list = []
......@@ -534,11 +534,11 @@ class ReplicationTests(NEOThreadedTest):
for i in xrange(10):
# store new data to U
txn = transaction.Transaction()
S.tpc_begin(txn)
oid = S.new_oid()
S.store(oid, None, '%s-%i' % (oid, i), '', txn)
S.tpc_vote(txn)
tid = S.tpc_finish(txn)
Z.tpc_begin(txn)
oid = Z.new_oid()
Z.store(oid, None, '%s-%i' % (oid, i), '', txn)
Z.tpc_vote(txn)
tid = Z.tpc_finish(txn)
oid_list.append(oid)
tid_list.append(tid)
......@@ -548,13 +548,20 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(1, self.checkBackup(B))
print '\n\n111 tid: %r, last_tid: %r, backup_tid: %r' % (tid, B.backup_tid, U.last_tid)
# try to read data from B
Sb._cache.clear()
# XXX we open new storage every time becasue invalidations are not yet implemented in read-only mode.
Zb = B.getZODBStorage()
#Zb.sync()
#Zb._cache.clear()
for j, oid in enumerate(oid_list):
data = Sb.load(oid, '')
data = Zb.load(oid, '')
self.assertEqual(data, '%s-%s' % (oid, j))
#Sb.loadSerial(oid, tid)
#Sb.loadBefore(oid, tid)
#Zb.loadSerial(oid, tid)
#Zb.loadBefore(oid, tid)
# TODO close Zb / client
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment