Commit fb8eaf3b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d58435ed
...@@ -119,9 +119,11 @@ class ClientServiceHandler(MasterHandler): ...@@ -119,9 +119,11 @@ class ClientServiceHandler(MasterHandler):
self.app.tm.abort(tid, conn.getUUID()) self.app.tm.abort(tid, conn.getUUID())
# like ClientServiceHandler but read-only & only up-to backup_tid # like ClientServiceHandler but read-only & only for tid <= backup_tid
# XXX naming -> (?) ClientReadOnlyHandler
class ClientROServiceHandler(ClientServiceHandler): class ClientROServiceHandler(ClientServiceHandler):
# XXX somehow make sure to propagate this to raiseReadOnlyError() on client ?
def _readOnly(self, *args, **kw): raise NotReadyError('read-only access') def _readOnly(self, *args, **kw): raise NotReadyError('read-only access')
askBeginTransaction = _readOnly askBeginTransaction = _readOnly
...@@ -131,5 +133,12 @@ class ClientROServiceHandler(ClientServiceHandler): ...@@ -131,5 +133,12 @@ class ClientROServiceHandler(ClientServiceHandler):
askPack = _readOnly askPack = _readOnly
abortTransaction = _readOnly abortTransaction = _readOnly
# XXX also override askLastIDs to return backup_tid as last_tid ? # XXX LastIDs is not used by client at all, and it requires work to determine
# XXX ----//---- askLastTransaction ? # last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
backup_tid = self.app.backup_tid
assert backup_tid is not None # in BACKUPING mode it is always set
conn.answer(Packets.AnswerLastTransaction(backup_tid))
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \ from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError from ..transactions import ConflictError, DelayedError, NotRegisteredError
...@@ -130,7 +130,7 @@ class ClientOperationHandler(EventHandler): ...@@ -130,7 +130,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList( conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition))) min_tid, max_tid, length, partition)))
def askTIDs(self, conn, first, last, partition): def _askTIDs(self, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
...@@ -143,6 +143,10 @@ class ClientOperationHandler(EventHandler): ...@@ -143,6 +143,10 @@ class ClientOperationHandler(EventHandler):
partition_list = [partition] partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first, partition_list) tid_list = app.dm.getTIDList(first, last - first, partition_list)
return tid_list
def askTIDs(self, conn, first, last, partition):
tid_list = self._askTIDs(first, last, partition)
conn.answer(Packets.AnswerTIDs(tid_list)) conn.answer(Packets.AnswerTIDs(tid_list))
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
...@@ -223,7 +227,7 @@ class ClientOperationHandler(EventHandler): ...@@ -223,7 +227,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial)) conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
# like ClientOperationHandler but read-only & only up-to backup_tid # like ClientOperationHandler but read-only & only for tid <= backup_tid
# XXX naming -> ClientReadOnlyHandler ? # XXX naming -> ClientReadOnlyHandler ?
class ClientROOperationHandler(ClientOperationHandler): class ClientROOperationHandler(ClientOperationHandler):
...@@ -234,9 +238,48 @@ class ClientROOperationHandler(ClientOperationHandler): ...@@ -234,9 +238,48 @@ class ClientROOperationHandler(ClientOperationHandler):
askVoteTransaction = _readOnly askVoteTransaction = _readOnly
askStoreObject = _readOnly askStoreObject = _readOnly
askFinalTID = _readOnly askFinalTID = _readOnly
# askObjectUndoSerial is used in undo() but itself is read-only query # askObjectUndoSerial is used in undo() but itself is read-only query XXX or cut <= backup_tid ?
askCheckCurrentSerial = _readOnly # takes write lock & is only used when going to commit askCheckCurrentSerial = _readOnly # takes write lock & is only used when going to commit
# XXX askTIDsFrom - cut tids in reply to backup_tid ? # below operations: like in ClientOperationHandler but cut tid <= backup_tid
# XXX askTIDs ----//---- ?
# XXX askObjectHistory ----//---- ? def askTransactionInformation(self, conn, tid):
backup_tid = self.app.dm.getBackupTID()
if tid > backup_tid:
p = Errors.TidNotFound('tids > %s are not fully fetched yet' % dump(backup_tid))
conn.answer(p)
return
super(ClientROOperationHandler, self).askTransactionInformation(conn, tid)
def askObject(self, conn, oid, serial, tid):
backup_tid = self.app.dm.getBackupTID()
if serial and serial > backup_tid:
# obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound
serial = ZERO_TID
if tid:
tid = min(tid, add64(backup_tid, 1))
# limit "latest obj" query to tid <= backup_tid
if not serial and not tid:
tid = add64(backup_tid, 1)
super(ClientROOperationHandler, self).askObject(conn, oid, serial, tid)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
backup_tid = self.app.dm.getBackupTID()
max_tid = min(max_tid, backup_tid)
# NOTE we don't need to adjust min_tid: if min_tid > max_tid
# db.getReplicationTIDList will return empty [], which is correct
super(ClientROOperationHandler, self).askTIDsFrom(
conn, min_tid, max_tid, length, partition)
def askTIDs(self, conn, first, last, partition):
backup_tid = self.app.dm.getBackupTID()
tid_list = self._askTIDs(first, last, partition)
tid_list = filter(lambda tid: tid <= backup_tid, tid_list)
conn.answer(Packets.AnswerTIDs(tid_list))
# FIXME askObjectHistory to limit tid <= backup_tid
# TODO dm.getObjectHistory has to be first fixed for this
#def askObjectHistory(self, conn, oid, first, last):
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment