Commit b2f51e77 by Julien Muchembled

storage: fix possible ConflictError when client is much faster than master

1 parent a020c593
Change History
==============
0.9.2 (unreleased)
------------------
- storage: fix possible ConflictError when client is much faster than master
0.9.1 (2011-09-24)
------------------
......
......@@ -199,7 +199,7 @@ RC - Review output of pylint (CODE)
- tpc_finish failures (FUNCTIONALITY)
New failure cases during tpc_finish must be handled.
- Fix and reenable deadlock avoidance (SPEED). This is required for
neo.tests.zodb.testBasic.BasicTests.check_checkCurrentSerialInTransaction
neo.threaded.test.Test.testDeadlockAvoidance
Admin
- Make admin node able to monitor multiple clusters simultaneously
......
......@@ -272,18 +272,20 @@ class TransactionManager(object):
neo.lib.logging.debug('Transaction %s storing %s',
dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid
elif locking_tid > ttid:
# We have a smaller TID than locking transaction, so we are older:
elif locking_tid < ttid:
# We have a bigger TTID than locking transaction, so we are younger:
# enter waiting queue so we are handled when lock gets released.
# We also want to delay (instead of conflict) if the client is
# so faster that it is committing another transaction before we
# processed UnlockInformation from the master.
neo.lib.logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(ttid), dump(locking_tid))
raise DelayedError
else:
# We have a bigger TTID than locking transaction, so we are
# younger: this is a possible deadlock case, as we might already
# hold locks that older transaction is waiting upon. Make client
# release locks & reacquire them by notifying it of the possible
# deadlock.
# We have a smaller TTID than locking transaction, so we are older:
# this is a possible deadlock case, as we might already hold locks
# the younger transaction is waiting upon. Make client release
# locks & reacquire them by notifying it of the possible deadlock.
neo.lib.logging.info('Possible deadlock on %r:%r with %r',
dump(oid), dump(ttid), dump(locking_tid))
raise ConflictError(ZERO_TID)
......
......@@ -144,8 +144,8 @@ class TransactionManagerTests(NeoUnitTestBase):
def testDelayed(self):
""" Two transactions, the first cause the second to be delayed """
uuid = self.getNewUUID()
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1)
......@@ -165,8 +165,8 @@ class TransactionManagerTests(NeoUnitTestBase):
def testUnresolvableConflict(self):
""" A newer transaction has already modified an object """
uuid = self.getNewUUID()
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1)
......@@ -201,8 +201,8 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
......@@ -228,8 +228,8 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
ttid2 = self.getNextTID()
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
......
......@@ -25,7 +25,7 @@ import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
from neo.lib import bootstrap, setupLog
from neo.lib.connection import BaseConnection
from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException
from neo.lib.event import EventManager
......@@ -161,19 +161,31 @@ class SerializedEventManager(EventManager):
EventManager._poll(self, timeout)
class ServerNode(object):
class Node(object):
def filterConnection(self, *peers):
addr = lambda c: c and (c.accepted_from or c.getAddress())
addr_set = set(addr(c.connector) for peer in peers
for c in peer.em.connection_dict.itervalues()
if isinstance(c, Connection))
addr_set.discard(None)
conn_list = (c for c in self.em.connection_dict.itervalues()
if isinstance(c, Connection) and addr(c.connector) in addr_set)
return ConnectionFilter(*conn_list)
class ServerNode(Node):
class __metaclass__(type):
def __init__(cls, name, bases, d):
type.__init__(cls, name, bases, d)
if object not in bases and threading.Thread not in cls.__mro__:
if Node not in bases and threading.Thread not in cls.__mro__:
cls.__bases__ = bases + (threading.Thread,)
@SerializedEventManager.decorate
def __init__(self, cluster, address, **kw):
self._init_args = (cluster, address), dict(kw)
threading.Thread.__init__(self)
self.daemon = True
self.setDaemon(True)
h, p = address
self.node_type = getattr(NodeTypes,
SERVER_TYPE[VIRTUAL_IP.index(h)].upper())
......@@ -258,7 +270,7 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
else:
assert False
class ClientApplication(neo.client.app.Application):
class ClientApplication(Node, neo.client.app.Application):
@SerializedEventManager.decorate
def __init__(self, cluster):
......@@ -282,6 +294,17 @@ class ClientApplication(neo.client.app.Application):
Serialized.background()
close = __del__
def filterConnection(self, *peers):
conn_list = []
for peer in peers:
if isinstance(peer, MasterApplication):
conn = self._getMasterConnection()
else:
assert isinstance(peer, StorageApplication)
conn = self.cp.getConnForNode(self.nm.getByUUID(peer.uuid))
conn_list.append(conn)
return ConnectionFilter(*conn_list)
class NeoCTL(neo.neoctl.app.NeoCTL):
@SerializedEventManager.decorate
......@@ -308,6 +331,100 @@ class LoggerThreadName(object):
except AttributeError:
return self.__default
class Patch(object):
def __init__(self, patched, **patch):
(name, patch), = patch.iteritems()
wrapped = getattr(patched, name)
wrapper = lambda *args, **kw: patch(wrapped, *args, **kw)
orig = patched.__dict__.get(name)
setattr(patched, name, wraps(wrapped)(wrapper))
if orig is None:
self._revert = lambda: delattr(patched, name)
else:
self._revert = lambda: setattr(patched, name, orig)
def __del__(self):
self._revert()
class ConnectionFilter(object):
def __init__(self, *conns):
self.filter_dict = {}
self.lock = threading.Lock()
self.conn_list = [(conn, self._patch(conn)) for conn in conns]
def _patch(self, conn):
assert '_addPacket' not in conn.__dict__
lock = self.lock
filter_dict = self.filter_dict
orig = conn.__class__._addPacket
queue = deque()
def _addPacket(packet):
lock.acquire()
try:
if not queue:
for filter in filter_dict:
if filter(conn, packet):
break
else:
return orig(conn, packet)
queue.append(packet)
finally:
lock.release()
conn._addPacket = _addPacket
return queue
def __call__(self, revert=1):
self.lock.acquire()
try:
self.filter_dict.clear()
self._retry()
if revert:
for conn, queue in self.conn_list:
assert not queue
del conn._addPacket
del self.conn_list[:]
finally:
self.lock.release()
def _retry(self):
for conn, queue in self.conn_list:
while queue:
packet = queue.popleft()
for filter in self.filter_dict:
if filter(conn, packet):
queue.appendleft(packet)
break
else:
conn.__class__._addPacket(conn, packet)
continue
break
def clear(self):
self(0)
def add(self, filter, *patches):
self.lock.acquire()
try:
self.filter_dict[filter] = patches
finally:
self.lock.release()
def remove(self, *filters):
self.lock.acquire()
try:
for filter in filters:
del self.filter_dict[filter]
self._retry()
finally:
self.lock.release()
def __contains__(self, filter):
return filter in self.filter_dict
class NEOCluster(object):
BaseConnection_checkTimeout = staticmethod(BaseConnection.checkTimeout)
......@@ -512,3 +629,25 @@ class NEOThreadedTest(NeoTestBase):
def setupLog(self):
log_file = os.path.join(getTempDirectory(), self.id() + '.log')
setupLog(LoggerThreadName(), log_file, True)
class newThread(threading.Thread):
def __init__(self, func, *args, **kw):
threading.Thread.__init__(self)
self.__target = func, args, kw
self.setDaemon(True)
self.start()
def run(self):
try:
apply(*self.__target)
self.__exc_info = None
except:
self.__exc_info = sys.exc_info()
def join(self, timeout=None):
threading.Thread.join(self, timeout)
if not self.isAlive() and self.__exc_info:
etype, value, tb = self.__exc_info
del self.__exc_info
raise etype, value, tb
......@@ -16,9 +16,15 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import threading
from thread import get_ident
from persistent import Persistent
from neo.lib.protocol import NodeStates, ZERO_TID
from neo.tests.threaded import NEOCluster, NEOThreadedTest
from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError
from neo.lib.connection import MTClientConnection
from neo.lib.protocol import NodeStates, Packets, ZERO_TID
from neo.tests.threaded import NEOCluster, NEOThreadedTest, \
Patch, ConnectionFilter
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
class PCounter(Persistent):
......@@ -31,6 +37,121 @@ class PCounterWithResolution(PCounter):
class Test(NEOThreadedTest):
def testDelayedUnlockInformation(self):
except_list = []
def delayUnlockInformation(conn, packet):
return isinstance(packet, Packets.NotifyUnlockInformation)
def onStoreObject(orig, tm, ttid, serial, oid, *args):
if oid == resume_oid and delayUnlockInformation in master_storage:
master_storage.remove(delayUnlockInformation)
try:
return orig(tm, ttid, serial, oid, *args)
except Exception, e:
except_list.append(e.__class__)
raise
cluster = NEOCluster(storage_count=1)
try:
cluster.start()
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounter()
master_storage = cluster.master.filterConnection(cluster.storage)
try:
resume_oid = None
master_storage.add(delayUnlockInformation,
Patch(TransactionManager, storeObject=onStoreObject))
t.commit()
resume_oid = ob._p_oid
ob._p_changed = 1
t.commit()
self.assertFalse(delayUnlockInformation in master_storage)
finally:
master_storage()
finally:
cluster.stop()
self.assertEqual(except_list, [DelayedError])
def _testDeadlockAvoidance(self, scenario):
except_list = []
delay = threading.Event(), threading.Event()
ident = get_ident()
def onStoreObject(orig, tm, ttid, serial, oid, *args):
if oid == counter_oid:
scenario[1] -= 1
if not scenario[1]:
delay[0].set()
try:
return orig(tm, ttid, serial, oid, *args)
except Exception, e:
except_list.append(e.__class__)
raise
def onAsk(orig, conn, packet, *args, **kw):
c2 = get_ident() == ident
switch = isinstance(packet, Packets.AskBeginTransaction)
if switch:
if c2:
delay[1].wait()
elif isinstance(packet, (Packets.AskStoreObject,
Packets.AskFinishTransaction)):
delay[c2].wait()
scenario[0] -= 1
switch = not scenario[0]
try:
return orig(conn, packet, *args, **kw)
finally:
if switch:
delay[c2].clear()
delay[1-c2].set()
cluster = NEOCluster(storage_count=2, replicas=1)
try:
cluster.start()
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounterWithResolution()
t.commit()
counter_oid = ob._p_oid
del ob, t, c
t1, c1 = cluster.getTransaction()
t2, c2 = cluster.getTransaction()
o1 = c1.root()[0]
o2 = c2.root()[0]
o1.value += 1
o2.value += 2
p = (Patch(TransactionManager, storeObject=onStoreObject),
Patch(MTClientConnection, ask=onAsk))
try:
t = self.newThread(t1.commit)
t2.commit()
t.join()
finally:
del p
t1.begin()
t2.begin()
self.assertEqual(o1.value, 3)
self.assertEqual(o2.value, 3)
finally:
cluster.stop()
return except_list
def testDelayedStore(self):
# 0: C1 -> S1, S2
# 1: C2 -> S1, S2 (delayed)
# 2: C1 commits
# 3: C2 resolves conflict
self.assertEqual(self._testDeadlockAvoidance([2, 4]),
[DelayedError, DelayedError, ConflictError, ConflictError])
def testDeadlockAvoidance(self):
# This test fail because deadlock avoidance is not fully implemented.
# 0: C1 -> S1
# 1: C2 -> S1, S2 (delayed)
# 2: C1 -> S2 (deadlock)
# 3: C2 commits
# 4: C1 resolves conflict
self.assertEqual(self._testDeadlockAvoidance([1, 3]),
[DelayedError, ConflictError, "???" ])
def testConflictResolutionTriggered2(self):
""" Check that conflict resolution works """
cluster = NEOCluster()
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!