Commit 787586e6 authored by Julien Muchembled's avatar Julien Muchembled

Do not ask storage to send oid/serial back on store/check

parent 3a93658b
......@@ -460,7 +460,8 @@ class Application(ThreadedApplication):
checksum, compressed_data, data_serial, ttid, unlock)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, on_timeout=on_timeout, queue=queue)
conn.ask(packet, on_timeout=on_timeout, queue=queue,
oid=oid, serial=serial)
add_involved_nodes(node)
except ConnectionClosed:
continue
......@@ -1001,7 +1002,7 @@ class Application(ThreadedApplication):
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, queue=queue)
conn.ask(packet, queue=queue, oid=oid, serial=serial)
except ConnectionClosed:
continue
checked_nodes.add(node)
......
......@@ -62,28 +62,28 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerObject(self, conn, oid, *args):
self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflicting, oid, serial):
def answerStoreObject(self, conn, conflict, oid, serial):
txn_context = self.app.getHandlerData()
object_stored_counter_dict = txn_context[
'object_stored_counter_dict'][oid]
if conflicting:
if conflict:
# Warning: if a storage (S1) is much faster than another (S2), then
# we may process entirely a conflict with S1 (i.e. we received the
# answer to the store of the resolved object on S1) before we
# receive the conflict answer from the first store on S2.
logging.info('%r report a conflict for %r with %r',
conn, dump(oid), dump(serial))
conn, dump(oid), dump(conflict))
# If this conflict is not already resolved, mark it for
# resolution.
if serial not in txn_context[
if conflict not in txn_context[
'resolved_conflict_serial_dict'].get(oid, ()):
if serial in object_stored_counter_dict and serial != ZERO_TID:
if ZERO_TID != conflict in object_stored_counter_dict:
raise NEOStorageError('Storages %s accepted object %s'
' for serial %s but %s reports a conflict for it.' % (
map(dump, object_stored_counter_dict[serial]),
dump(oid), dump(serial), dump(conn.getUUID())))
map(dump, object_stored_counter_dict[conflict]),
dump(oid), dump(conflict), dump(conn.getUUID())))
conflict_serial_dict = txn_context['conflict_serial_dict']
conflict_serial_dict.setdefault(oid, set()).add(serial)
conflict_serial_dict.setdefault(oid, set()).add(conflict)
else:
uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node
......
......@@ -964,9 +964,7 @@ class StoreObject(Packet):
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
PTID('conflict'),
)
class AbortTransaction(Packet):
......@@ -1274,11 +1272,7 @@ class CheckCurrentSerial(Packet):
POID('oid'),
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
)
_answer = StoreObject._answer
class Pack(Packet):
"""
......
......@@ -77,7 +77,7 @@ class ClientOperationHandler(EventHandler):
checksum, data, data_serial, unlock)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(1, oid, err.tid))
conn.answer(Packets.AnswerStoreObject(err.tid))
except DelayedError:
# locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise
......@@ -96,13 +96,13 @@ class ClientOperationHandler(EventHandler):
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
conn.answer(Packets.AnswerStoreObject(None))
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
conn.answer(Packets.AnswerStoreObject(None))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid, unlock):
......@@ -194,7 +194,7 @@ class ClientOperationHandler(EventHandler):
self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid, err.tid))
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
except DelayedError:
# locked by a previous transaction, retry later
try:
......@@ -208,13 +208,13 @@ class ClientOperationHandler(EventHandler):
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
conn.answer(Packets.AnswerCheckCurrentSerial(None))
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
conn.answer(Packets.AnswerCheckCurrentSerial(None))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
......
......@@ -1340,11 +1340,11 @@ class Test(NEOThreadedTest):
reports a conflict after that this conflict was fully resolved with
another node.
"""
def answerStoreObject(orig, conn, conflicting, *args):
if not conflicting:
def answerStoreObject(orig, conn, conflict, **kw):
if not conflict:
p.revert()
ll()
orig(conn, conflicting, *args)
orig(conn, conflict, **kw)
if 1:
s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment