Commit 7fccafeb authored by Vincent Pelletier's avatar Vincent Pelletier

Resolve deadlocks detected by storage nodes.

Also reverts r2533: even if it's true that TIDs have no meaning at that level,
they are a handy way to prioritise a transaction over another, to break lock
cycles (aka deadlocks). This is the "detection" part of the change.
When a storage reports a deadlock, client must store all already-stored
objects again with "unlock" flag set.
Upon receiving those store requests, storage must release locks held by
transaction on those objects, and requeue the store request. If client didn't
hold any lock (initial store was still in queue), drop the second store
request.
This doesn't solve possible deadlocks if ZODB-level sends us objects in a
different order (ex: client 1 sending [OID1, OID2] & client 2 sending
[OID2, OID1]).
There is one important change to r2533's revert, which queues older
transactions and notifies deadlock for younger ones. The original code did it
the other way around, and it looks unfair to old transactions (they will keep
being delayed by younger ones, and will just get older and older).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2596 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 68b6c5e9
......@@ -699,7 +699,7 @@ class Application(object):
self._store(oid, serial, data)
return None
def _store(self, oid, serial, data, data_serial=None):
def _store(self, oid, serial, data, data_serial=None, unlock=False):
if data is None:
# This is some undo: either a no-data object (undoing object
# creation) or a back-pointer to an earlier revision (going back to
......@@ -731,7 +731,7 @@ class Application(object):
queue = self.local_var.queue
add_involved_nodes = self.local_var.involved_nodes.add
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, self.local_var.tid)
checksum, compressed_data, data_serial, self.local_var.tid, unlock)
for node, conn in self.cp.iterateForObject(oid, writable=True):
try:
conn.ask(packet, on_timeout=on_timeout, queue=queue)
......@@ -776,7 +776,37 @@ class Application(object):
data = data_dict[oid]
tid = local_var.tid
resolved = False
if data is not None:
if conflict_serial == ZERO_TID:
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
# "locking priority": when a higher value has the lock,
# this means we stored objects "too late", and we would
# otherwise cause a deadlock.
# To recover, we must ask storages to release locks we
# hold (to let possibly-competing transactions acquire
# them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send
# object data again.
neo.logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
for store_oid, store_data in \
local_var.data_dict.iteritems():
store_serial = object_serial_dict[store_oid]
if store_data is None:
self.checkCurrentSerialInTransaction(store_oid,
store_serial)
else:
if store_data is '':
# Some undo
neo.logging.warning('Deadlock avoidance cannot'
' reliably work with undo, this must be '
'implemented.')
break
self._store(store_oid, store_serial, store_data,
unlock=True)
else:
resolved = True
elif data is not None:
new_data = tryToResolveConflict(oid, conflict_serial,
serial, data)
if new_data is not None:
......
......@@ -153,3 +153,6 @@ class StorageAnswersHandler(AnswerBaseHandler):
# XXX: Not sure what to do in this case yet
raise NotImplementedError
def alreadyPendingError(self, conn, message):
pass
......@@ -230,7 +230,7 @@ class EventHandler(object):
raise UnexpectedPacketError
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, tid):
compression, checksum, data, data_serial, tid, unlock):
raise UnexpectedPacketError
def answerStoreObject(self, conn, conflicting, oid, serial):
......@@ -411,6 +411,9 @@ class EventHandler(object):
def brokenNodeDisallowedError(self, conn, message):
raise RuntimeError, 'broken node disallowed error: %s' % (message,)
def alreadyPendingError(self, conn, message):
neo.logging.error('already pending error: %s' % (message, ))
def ack(self, conn, message):
neo.logging.debug("no error message : %s" % (message))
......@@ -516,6 +519,7 @@ class EventHandler(object):
d[ErrorCodes.TID_NOT_FOUND] = self.tidNotFound
d[ErrorCodes.PROTOCOL_ERROR] = self.protocolError
d[ErrorCodes.BROKEN_NODE] = self.brokenNodeDisallowedError
d[ErrorCodes.ALREADY_PENDING] = self.alreadyPendingError
return d
......@@ -45,6 +45,7 @@ class ErrorCodes(Enum):
TID_NOT_FOUND = Enum.Item(3)
PROTOCOL_ERROR = Enum.Item(4)
BROKEN_NODE = Enum.Item(5)
ALREADY_PENDING = Enum.Item(7)
ErrorCodes = ErrorCodes()
class ClusterStates(Enum):
......@@ -912,26 +913,28 @@ class AskStoreObject(Packet):
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
"""
_header_format = '!8s8s8sBL8s'
_header_format = '!8s8s8sBL8sB'
@profiler_decorator
def _encode(self, oid, serial, compression, checksum, data, data_serial,
tid):
tid, unlock):
if serial is None:
serial = INVALID_TID
if data_serial is None:
data_serial = INVALID_TID
unlock = unlock and 1 or 0
return pack(self._header_format, oid, serial, tid, compression,
checksum, data_serial) + _encodeString(data)
checksum, data_serial, unlock) + _encodeString(data)
def _decode(self, body):
header_len = self._header_len
r = unpack(self._header_format, body[:header_len])
oid, serial, tid, compression, checksum, data_serial = r
oid, serial, tid, compression, checksum, data_serial, unlock = r
serial = _decodeTID(serial)
data_serial = _decodeTID(data_serial)
(data, _) = _decodeString(body, 'data', offset=header_len)
return (oid, serial, compression, checksum, data, data_serial, tid)
return (oid, serial, compression, checksum, data, data_serial, tid,
bool(unlock))
class AnswerStoreObject(Packet):
"""
......@@ -2065,6 +2068,7 @@ class ErrorRegistry(dict):
OidDoesNotExist = register_error(ErrorCodes.OID_DOES_NOT_EXIST)
NotReady = register_error(ErrorCodes.NOT_READY)
Broken = register_error(ErrorCodes.BROKEN_NODE)
AlreadyPending = register_error(ErrorCodes.ALREADY_PENDING)
Errors = ErrorRegistry()
......@@ -29,6 +29,7 @@ from neo.storage.handlers import master, hidden
from neo.storage.replicator import Replicator
from neo.storage.database import buildDatabaseManager
from neo.storage.transactions import TransactionManager
from neo.storage.exception import AlreadyPendingError
from neo.connector import getConnectorHandler
from neo.pt import PartitionTable
from neo.util import dump
......@@ -71,6 +72,7 @@ class Application(object):
# operation related data
self.event_queue = None
self.event_queue_keys = None
self.operational = False
# ready is True when operational and got all informations
......@@ -193,6 +195,7 @@ class Application(object):
conn.close()
# create/clear event queue
self.event_queue = deque()
self.event_queue_keys = set()
try:
self.verifyData()
self.initialize()
......@@ -318,15 +321,25 @@ class Application(object):
if not node.isHidden():
break
def queueEvent(self, some_callable, conn, *args):
def queueEvent(self, some_callable, conn, args, key=None,
raise_on_duplicate=True):
msg_id = conn.getPeerId()
self.event_queue.append((some_callable, msg_id, conn, args))
keys = self.event_queue_keys
if raise_on_duplicate and key in keys:
raise AlreadyPendingError()
else:
self.event_queue.append((key, some_callable, msg_id, conn, args))
if key is not None:
keys.add(key)
def executeQueuedEvents(self):
l = len(self.event_queue)
p = self.event_queue.popleft
remove = self.event_queue_keys.remove
for _ in xrange(l):
some_callable, msg_id, conn, args = p()
key, some_callable, msg_id, conn, args = p()
if key is not None:
remove(key)
if conn.isAborted() or conn.isClosed():
continue
orig_msg_id = conn.getPeerId()
......
#
# Copyright (C) 2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
class AlreadyPendingError(Exception):
pass
......@@ -82,7 +82,7 @@ class BaseClientAndStorageOperationHandler(EventHandler):
app = self.app
if self.app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObject, conn, oid, serial, tid)
app.queueEvent(self.askObject, conn, (oid, serial, tid))
return
o = self._askObject(oid, serial, tid)
if o is None:
......
......@@ -21,6 +21,7 @@ from neo.util import dump
from neo.protocol import Packets, LockState, Errors
from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.storage.transactions import ConflictError, DelayedError
from neo.storage.exception import AlreadyPendingError
import time
# Log stores taking (incl. lock delays) more than this many seconds.
......@@ -47,7 +48,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
conn.answer(Packets.AnswerStoreTransaction(tid))
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, tid, request_time):
data_serial, tid, unlock, request_time):
if tid not in self.app.tm:
# transaction was aborted, cancel this event
neo.logging.info('Forget store of %s:%s by %s delayed by %s',
......@@ -58,15 +59,23 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
return
try:
self.app.tm.storeObject(tid, serial, oid, compression,
checksum, data, data_serial)
checksum, data, data_serial, unlock)
except ConflictError, err:
# resolvable or not
tid_or_serial = err.getTID()
conn.answer(Packets.AnswerStoreObject(1, oid, tid_or_serial))
except DelayedError:
# locked by a previous transaction, retry later
self.app.queueEvent(self._askStoreObject, conn, oid, serial,
compression, checksum, data, data_serial, tid, request_time)
# If we are unlocking, we want queueEvent to raise
# AlreadyPendingError, to avoid making lcient wait for an unneeded
# response.
try:
self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, tid,
unlock, request_time), key=(oid, tid),
raise_on_duplicate=unlock)
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
......@@ -75,7 +84,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, tid):
compression, checksum, data, data_serial, tid, unlock):
# register the transaction
self.app.tm.register(conn.getUUID(), tid)
if data_serial is not None:
......@@ -84,7 +93,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
# delayed.
data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, tid, time.time())
data_serial, tid, unlock, time.time())
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list):
app = self.app
......@@ -172,8 +181,11 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
err.getTID()))
except DelayedError:
# locked by a previous transaction, retry later
self.app.queueEvent(self._askCheckCurrentSerial, conn, tid, serial,
oid, request_time)
try:
self.app.queueEvent(self._askCheckCurrentSerial, conn, (tid,
serial, oid, request_time), key=(oid, tid))
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
......
......@@ -18,7 +18,7 @@
from time import time
import neo
from neo.util import dump
from neo.protocol import ZERO_TID
class ConflictError(Exception):
"""
......@@ -214,7 +214,7 @@ class TransactionManager(object):
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
def lockObject(self, tid, serial, oid):
def lockObject(self, tid, serial, oid, unlock=False):
"""
Take a write lock on given object, checking that "serial" is
current.
......@@ -224,6 +224,16 @@ class TransactionManager(object):
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
if locking_tid == tid and unlock:
neo.logging.info('Deadlock resolution on %r:%r', dump(oid),
dump(tid))
# A duplicate store means client is resolving a deadlock, so
# drop the lock it held on this object.
del self._store_lock_dict[oid]
# Give a chance to pending events to take that lock now.
self._app.executeQueuedEvents()
# Attemp to acquire lock again.
locking_tid = self._store_lock_dict.get(oid)
if locking_tid == tid:
neo.logging.info('Transaction %s storing %s more than once',
dump(tid), dump(oid))
......@@ -236,24 +246,34 @@ class TransactionManager(object):
raise ConflictError(history_list[0][0])
neo.logging.info('Transaction %s storing %s', dump(tid), dump(oid))
self._store_lock_dict[oid] = tid
else:
# a previous transaction lock this object, retry later
elif locking_tid > tid:
# We have a smaller TID than locking transaction, so we are older:
# enter waiting queue so we are handled when lock gets released.
neo.logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(tid), dump(locking_tid))
raise DelayedError
else:
# We have a bigger TID than locking transaction, so we are
# younger: this is a possible deadlock case, as we might already
# hold locks that older transaction is waiting upon. Make client
# release locks & reacquire them by notifying it of the possible
# deadlock.
neo.logging.info('Possible deadlock on %r:%r with %r',
dump(oid), dump(tid), dump(locking_tid))
raise ConflictError(ZERO_TID)
def checkCurrentSerial(self, tid, serial, oid):
self.lockObject(tid, serial, oid)
self.lockObject(tid, serial, oid, unlock=True)
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
transaction.addCheckedObject(oid)
def storeObject(self, tid, serial, oid, compression, checksum, data,
value_serial):
value_serial, unlock=False):
"""
Store an object received from client node
"""
self.lockObject(tid, serial, oid)
self.lockObject(tid, serial, oid, unlock=unlock)
# store object
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
......
......@@ -616,11 +616,11 @@ class ClientApplicationTests(NeoUnitTestBase):
""" check that abort is sent to all nodes involved in the transaction """
app = self.getApp()
# three partitions/storages: one per object/transaction
app.num_partitions = 3
app.num_partitions = num_partitions = 3
app.num_replicas = 0
tid = self.makeTID(0) # on partition 0
oid1 = self.makeOID(1) # on partition 1, conflicting
oid2 = self.makeOID(2) # on partition 2
tid = self.makeTID(num_partitions) # on partition 0
oid1 = self.makeOID(num_partitions + 1) # on partition 1, conflicting
oid2 = self.makeOID(num_partitions + 2) # on partition 2
# storage nodes
uuid1, uuid2, uuid3 = [self.getNewUUID() for _ in range(3)]
address1 = ('127.0.0.1', 10000)
......
......@@ -254,8 +254,8 @@ class ClientTests(NEOFunctionalTest):
oid = st1.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject())
st1.tpc_begin(t1)
st2.tpc_begin(t2)
st1.tpc_begin(t1)
st1.store(oid, rev, data, '', t1)
# this store will be delayed
st2.store(oid, rev, data, '', t2)
......
......@@ -45,6 +45,7 @@ class StorageClientHandlerTests(NeoUnitTestBase):
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
self.app.event_queue_keys = set()
self.app.tm = Mock({'__contains__': True})
# handler
self.operation = ClientOperationHandler(self.app)
......@@ -215,9 +216,9 @@ class StorageClientHandlerTests(NeoUnitTestBase):
tid = self.getNextTID()
oid, serial, comp, checksum, data = self._getObject()
self.operation.askStoreObject(conn, oid, serial, comp, checksum,
data, None, tid)
data, None, tid, False)
self._checkStoreObjectCalled(tid, serial, oid, comp,
checksum, data, None)
checksum, data, None, False)
pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
decode=True)
self.assertEqual(pconflicting, 0)
......@@ -232,9 +233,9 @@ class StorageClientHandlerTests(NeoUnitTestBase):
oid, serial, comp, checksum, data = self._getObject()
data_tid = self.getNextTID()
self.operation.askStoreObject(conn, oid, serial, comp, checksum,
'', data_tid, tid)
'', data_tid, tid, False)
self._checkStoreObjectCalled(tid, serial, oid, comp,
checksum, None, data_tid)
checksum, None, data_tid, False)
pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
decode=True)
self.assertEqual(pconflicting, 0)
......@@ -252,7 +253,7 @@ class StorageClientHandlerTests(NeoUnitTestBase):
self.app.tm.storeObject = fakeStoreObject
oid, serial, comp, checksum, data = self._getObject()
self.operation.askStoreObject(conn, oid, serial, comp, checksum,
data, None, tid)
data, None, tid, False)
pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
decode=True)
self.assertEqual(pconflicting, 1)
......
......@@ -23,6 +23,7 @@ from neo.protocol import CellStates
from collections import deque
from neo.pt import PartitionTable
from neo.util import dump
from neo.storage.exception import AlreadyPendingError
class StorageAppTests(NeoUnitTestBase):
......@@ -33,6 +34,7 @@ class StorageAppTests(NeoUnitTestBase):
config = self.getStorageConfiguration(master_number=1)
self.app = Application(config)
self.app.event_queue = deque()
self.app.event_queue_keys = set()
def test_01_loadPartitionTable(self):
self.app.dm = Mock({
......@@ -121,12 +123,20 @@ class StorageAppTests(NeoUnitTestBase):
msg_id = 1325136
event = Mock({'__repr__': 'event'})
conn = Mock({'__repr__': 'conn', 'getPeerId': msg_id})
self.app.queueEvent(event, conn, "test")
key = 'foo'
self.app.queueEvent(event, conn, ("test", ), key=key)
self.assertEqual(len(self.app.event_queue), 1)
_event, _msg_id, _conn, args = self.app.event_queue[0]
_key, _event, _msg_id, _conn, args = self.app.event_queue[0]
self.assertEqual(key, _key)
self.assertEqual(msg_id, _msg_id)
self.assertEqual(len(args), 1)
self.assertEqual(args[0], "test")
self.assertRaises(AlreadyPendingError, self.app.queueEvent, event,
conn, ("test2", ), key=key)
self.assertEqual(len(self.app.event_queue), 1)
self.app.queueEvent(event, conn, ("test3", ), key=key,
raise_on_duplicate=False)
self.assertEqual(len(self.app.event_queue), 2)
def test_03_executeQueuedEvents(self):
self.assertEqual(len(self.app.event_queue), 0)
......@@ -134,7 +144,7 @@ class StorageAppTests(NeoUnitTestBase):
msg_id_2 = 1325137
event = Mock({'__repr__': 'event'})
conn = Mock({'__repr__': 'conn', 'getPeerId': ReturnValues(msg_id, msg_id_2)})
self.app.queueEvent(event, conn, "test")
self.app.queueEvent(event, conn, ("test", ))
self.app.executeQueuedEvents()
self.assertEquals(len(event.mockGetNamedCalls("__call__")), 1)
call = event.mockGetNamedCalls("__call__")[0]
......
......@@ -43,6 +43,7 @@ class StorageStorageHandlerTests(NeoUnitTestBase):
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
self.app.event_queue_keys = set()
# handler
self.operation = StorageOperationHandler(self.app)
# set pmn
......
......@@ -144,8 +144,8 @@ class TransactionManagerTests(NeoUnitTestBase):
def testDelayed(self):
""" Two transactions, the first cause the second to be delayed """
uuid = self.getNewUUID()
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1)
......@@ -162,6 +162,27 @@ class TransactionManagerTests(NeoUnitTestBase):
self.assertRaises(DelayedError, self.manager.storeObject,
ttid2, serial, *obj)
def testUnresolvableConflict(self):
""" A newer transaction has already modified an object """
uuid = self.getNewUUID()
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1)
# the (later) transaction lock (change) the object
self.manager.register(uuid, ttid2)
self.manager.storeTransaction(ttid2, *txn2)
self.assertTrue(ttid2 in self.manager)
self._storeTransactionObjects(ttid2, txn2)
self.manager.lock(ttid2, tid2, txn2[0])
# the previous it's not using the latest version
self.manager.register(uuid, ttid1)
self.manager.storeTransaction(ttid1, *txn1)
self.assertTrue(ttid1 in self.manager)
self.assertRaises(ConflictError, self.manager.storeObject,
ttid1, serial, *obj)
def testResolvableConflict(self):
""" Try to store an object with the lastest revision """
uuid = self.getNewUUID()
......@@ -180,8 +201,8 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
......@@ -207,8 +228,8 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
......@@ -220,13 +241,13 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager.storeObject(ttid2, serial2, *obj2)
self.assertTrue(ttid2 in self.manager)
self.manager.lock(ttid2, tid2, txn1[0])
# the first get a delay, as nothing is committed yet
# the first get a conflict
self.manager.register(uuid1, ttid1)
self.manager.storeTransaction(ttid1, *txn1)
self.assertTrue(ttid1 in self.manager)
self.assertRaises(DelayedError, self.manager.storeObject,
self.assertRaises(ConflictError, self.manager.storeObject,
ttid1, serial1, *obj1)
self.assertRaises(DelayedError, self.manager.storeObject,
self.assertRaises(ConflictError, self.manager.storeObject,
ttid1, serial2, *obj2)
def testAbortUnlocked(self):
......
......@@ -352,8 +352,10 @@ class ProtocolTests(NeoUnitTestBase):
serial = self.getNextTID()
tid = self.getNextTID()
tid2 = self.getNextTID()
p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid2, tid)
poid, pserial, compression, checksum, data, ptid2, ptid = p.decode()
unlock = False
p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid2, tid, unlock)
poid, pserial, compression, checksum, data, ptid2, ptid, punlock = \
p.decode()
self.assertEqual(oid, poid)
self.assertEqual(serial, pserial)
self.assertEqual(tid, ptid)
......@@ -361,6 +363,7 @@ class ProtocolTests(NeoUnitTestBase):
self.assertEqual(compression, 1)
self.assertEqual(checksum, 55)
self.assertEqual(data, "to")
self.assertEqual(unlock, punlock)
def test_46_answerStoreObject(self):
oid = self.getNextTID()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment