Commit c277ed20 authored by Julien Muchembled's avatar Julien Muchembled

client: really process all invalidations in poll thread

This changes completely how to get data from storages than is not too
recent and NEO now behaves as expected by ZODB, instead of trying to
snapshot at Storage level.
However, ZODB should probably be changed to avoid double loading when
an invalidation is received during a load.
parent f7b67cb9
......@@ -167,6 +167,7 @@ RC - Review output of pylint (CODE)
must be propagated to client.
Client
- Merge Application into Storage (SPEED)
- Implement C version of mq.py (LOAD LATENCY)
- Use generic bootstrap module (CODE)
- Find a way to make ask() from the thread poll to allow send initial packet
......
......@@ -20,7 +20,6 @@ import ZODB.interfaces
from functools import wraps
from neo.lib import logging
from neo.lib.locking import Lock
from neo.lib.util import add64
from neo.lib.protocol import ZERO_TID
from .app import Application
......@@ -37,17 +36,6 @@ class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
# Stores the highest TID visible for current transaction.
# First call sets this snapshot by asking master node most recent
# committed TID.
# As a (positive) side-effect, this forces us to handle all pending
# invalidations, so we get a very recent view of the database (which is
# good when multiple databases are used in the same program with some
# amount of referential integrity).
# Should remain None when not bound to a connection,
# so that it always read the last revision.
_snapshot_tid = None
implements(*filter(None, (
ZODB.interfaces.IStorage,
# "restore" missing for the moment, but "store" implements this
......@@ -88,17 +76,6 @@ class Storage(BaseStorage.BaseStorage,
'dynamic_master_list': dynamic_master_list,
'_app': _app,
}
snapshot_lock = Lock()
acquire = snapshot_lock.acquire
release = snapshot_lock.release
def _setSnapshotTid(tid):
acquire()
try:
if self._snapshot_tid <= tid:
self._snapshot_tid = add64(tid, 1)
finally:
release()
self._setSnapshotTid = _setSnapshotTid
@property
def _cache(self):
......@@ -117,7 +94,7 @@ class Storage(BaseStorage.BaseStorage,
# it optional.
assert version == '', 'Versions are not supported'
try:
return self.app.load(oid, None, self._snapshot_tid)[:2]
return self.app.load(oid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
......@@ -143,14 +120,8 @@ class Storage(BaseStorage.BaseStorage,
return self.app.tpc_abort(transaction=transaction)
def tpc_finish(self, transaction, f=None):
tid = self.app.tpc_finish(transaction=transaction,
return self.app.tpc_finish(transaction=transaction,
tryToResolveConflict=self.tryToResolveConflict, f=f)
# XXX: Note that when undoing changes, the following is useless because
# a temporary Storage object is used to commit.
# See also testZODB.NEOZODBTests.checkMultipleUndoInOneTransaction
if self._snapshot_tid:
self._setSnapshotTid(tid)
return tid
@check_read_only
def store(self, oid, serial, data, version, transaction):
......@@ -189,9 +160,7 @@ class Storage(BaseStorage.BaseStorage,
# undo
@check_read_only
def undo(self, transaction_id, txn):
return self.app.undo(self._snapshot_tid, undone_tid=transaction_id,
txn=txn, tryToResolveConflict=self.tryToResolveConflict)
return self.app.undo(transaction_id, txn, self.tryToResolveConflict)
@check_read_only
def undoLog(self, first=0, last=-20, filter=None):
......@@ -213,7 +182,7 @@ class Storage(BaseStorage.BaseStorage,
def loadEx(self, oid, version):
try:
data, serial, _ = self.app.load(oid, None, self._snapshot_tid)
data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
return data, serial, ''
......@@ -231,9 +200,10 @@ class Storage(BaseStorage.BaseStorage,
raise POSException.POSKeyError(oid)
def sync(self, force=True):
# Increment by one, as we will use this as an excluded upper
# bound (loadBefore).
self._setSnapshotTid(self.lastTransaction())
# XXX: sync() is part of IMVCCStorage and we don't want to be called
# from afterCompletion() so it may not be a good place to ping the
# master here. See also monkey-patch in __init__.py
self.app.lastTransaction()
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
......
......@@ -45,7 +45,7 @@ from .poll import ThreadedPoll, psThreadedPoll
from .iterator import Iterator
from .cache import ClientCache
from .pool import ConnectionPool
from neo.lib.util import u64, parseMasterList
from neo.lib.util import p64, u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
from neo.lib.debug import register as registerLiveDebugger
from .container import ThreadContainer, TransactionContainer
......@@ -65,7 +65,7 @@ else:
compress = real_compress
makeChecksum = real_makeChecksum
CHECKED_SERIAL = object()
CHECKED_SERIAL = master.CHECKED_SERIAL
class Application(object):
......@@ -100,6 +100,7 @@ class Application(object):
self._loading_oid = None
self.new_oid_list = []
self.last_oid = '\0' * 8
self.last_tid = None
self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
self.storage_handler = storage.StorageAnswersHandler(self)
......@@ -342,6 +343,7 @@ class Application(object):
handler = self.primary_bootstrap_handler
ask(conn, Packets.AskNodeInformation(), handler=handler)
ask(conn, Packets.AskPartitionTable(), handler=handler)
self.lastTransaction()
return self.pt.operational()
def registerDB(self, db, limit):
......@@ -425,6 +427,13 @@ class Application(object):
self._loading_oid = oid
finally:
release()
# When not bound to a ZODB Connection, load() may be the
# first method called and last_tid may still be None.
# This happens, for example, when opening the DB.
if not (tid or before_tid) and self.last_tid:
# Do not get something more recent than the last invalidation
# we got from master.
before_tid = p64(u64(self.last_tid) + 1)
result = self._loadFromStorage(oid, tid, before_tid)
acquire()
try:
......@@ -777,34 +786,14 @@ class Application(object):
# Call finish on master
cache_dict = txn_context['cache_dict']
tid = self._askPrimary(Packets.AskFinishTransaction(
txn_context['ttid'], cache_dict), callback=f)
# Update cache
self._cache_lock_acquire()
try:
cache = self._cache
for oid, data in cache_dict.iteritems():
if data is CHECKED_SERIAL:
# this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data
# was modified).
continue
# Update ex-latest value in cache
try:
cache.invalidate(oid, tid)
except KeyError:
pass
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
finally:
self._cache_lock_release()
txn_context['ttid'], cache_dict),
cache_dict=cache_dict, callback=f)
txn_container.delete(transaction)
return tid
finally:
self._load_lock_release()
def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
def undo(self, undone_tid, txn, tryToResolveConflict):
txn_context = self._txn_container.get(txn)
if txn_context is None:
raise StorageTransactionError(self, undone_tid)
......@@ -831,6 +820,7 @@ class Application(object):
queue = self._getThreadQueue()
ttid = txn_context['ttid']
undo_object_tid_dict = {}
snapshot_tid = p64(u64(self.last_tid) + 1)
for partition, oid_list in partition_oid_dict.iteritems():
cell_list = getCellList(partition, readable=True)
# We do want to shuffle before getting one with the smallest
......@@ -1027,7 +1017,8 @@ class Application(object):
return Iterator(self, start, stop)
def lastTransaction(self):
return self._askPrimary(Packets.AskLastTransaction())
self._askPrimary(Packets.AskLastTransaction())
return self.last_tid
def abortVersion(self, src, transaction):
if self._txn_container.get(transaction) is None:
......
......@@ -21,6 +21,8 @@ from neo.lib.util import dump
from . import BaseHandler, AnswerBaseHandler
from ..exception import NEOStorageError
CHECKED_SERIAL = object()
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
......@@ -91,10 +93,34 @@ class PrimaryNotificationsHandler(BaseHandler):
""" Handler that process the notifications from the primary master """
def packetReceived(self, conn, packet, kw={}):
if type(packet) is Packets.AnswerTransactionFinished:
if type(packet) is Packets.AnswerLastTransaction:
self.app.last_tid = packet.decode()[0]
elif type(packet) is Packets.AnswerTransactionFinished:
app = self.app
app.last_tid = tid = packet.decode()[1]
callback = kw.pop('callback')
# Update cache
cache = app._cache
app._cache_lock_acquire()
try:
for oid, data in kw.pop('cache_dict').iteritems():
if data is CHECKED_SERIAL:
# this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data
# was modified).
continue
# Update ex-latest value in cache
try:
cache.invalidate(oid, tid)
except KeyError:
pass
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
if callback is not None:
callback(packet.decode()[1])
callback(tid)
finally:
app._cache_lock_release()
BaseHandler.packetReceived(self, conn, packet, kw)
def connectionClosed(self, conn):
......@@ -110,6 +136,7 @@ class PrimaryNotificationsHandler(BaseHandler):
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
app.last_tid = tid
app._cache_lock_acquire()
try:
invalidate = app._cache.invalidate
......@@ -163,5 +190,4 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
raise NEOStorageError('Already packing')
def answerLastTransaction(self, conn, ltid):
self.app.setHandlerData(ltid)
pass
......@@ -510,13 +510,12 @@ class ClientApplicationTests(NeoUnitTestBase):
# invalid transaction
app = self.getApp()
tid = self.makeTID()
snapshot_tid = self.getNextTID()
txn = self.makeTransactionObject()
def tryToResolveConflict(oid, conflict_serial, serial, data):
pass
app.master_conn = Mock()
conn = Mock()
self.assertRaises(StorageTransactionError, app.undo, snapshot_tid, tid,
self.assertRaises(StorageTransactionError, app.undo, tid,
txn, tryToResolveConflict)
# no packet sent
self.checkNoPacketSent(conn)
......@@ -552,6 +551,7 @@ class ClientApplicationTests(NeoUnitTestBase):
unlock=False):
store_marker.append((oid, serial, data, data_serial))
app._store = _store
app.last_tid = self.getNextTID()
return app, conn, store_marker
def test_undoWithResolutionSuccess(self):
......@@ -567,7 +567,6 @@ class ClientApplicationTests(NeoUnitTestBase):
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tid3 = self.getNextTID()
snapshot_tid = self.getNextTID()
app, conn, store_marker = self._getAppForUndoTests(oid0, tid0, tid1,
tid2)
undo_serial = Packets.AnswerObjectUndoSerial({
......@@ -583,7 +582,7 @@ class ClientApplicationTests(NeoUnitTestBase):
return 'solved'
# The undo
txn = self.beginTransaction(app, tid=tid3)
app.undo(snapshot_tid, tid1, txn, tryToResolveConflict)
app.undo(tid1, txn, tryToResolveConflict)
# Checking what happened
moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0]
self.assertEqual(moid, oid0)
......@@ -610,7 +609,6 @@ class ClientApplicationTests(NeoUnitTestBase):
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tid3 = self.getNextTID()
snapshot_tid = self.getNextTID()
undo_serial = Packets.AnswerObjectUndoSerial({
oid0: (tid2, tid0, False)})
undo_serial.setId(2)
......@@ -626,8 +624,7 @@ class ClientApplicationTests(NeoUnitTestBase):
return None
# The undo
txn = self.beginTransaction(app, tid=tid3)
self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn,
tryToResolveConflict)
self.assertRaises(UndoError, app.undo, tid1, txn, tryToResolveConflict)
# Checking what happened
moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0]
self.assertEqual(moid, oid0)
......@@ -644,8 +641,7 @@ class ClientApplicationTests(NeoUnitTestBase):
marker.append((oid, conflict_serial, serial, data, committedData))
raise ConflictError
# The undo
self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn,
tryToResolveConflict)
self.assertRaises(UndoError, app.undo, tid1, txn, tryToResolveConflict)
# Checking what happened
moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0]
self.assertEqual(moid, oid0)
......@@ -667,7 +663,6 @@ class ClientApplicationTests(NeoUnitTestBase):
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tid3 = self.getNextTID()
snapshot_tid = self.getNextTID()
transaction_info = Packets.AnswerTransactionInformation(tid1, '', '',
'', False, (oid0, ))
transaction_info.setId(1)
......@@ -685,7 +680,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'is no conflict in this test !'
# The undo
txn = self.beginTransaction(app, tid=tid3)
app.undo(snapshot_tid, tid1, txn, tryToResolveConflict)
app.undo(tid1, txn, tryToResolveConflict)
# Checking what happened
moid, mserial, mdata, mdata_serial = store_marker[0]
self.assertEqual(moid, oid0)
......@@ -765,10 +760,13 @@ class ClientApplicationTests(NeoUnitTestBase):
# will raise IndexError at the third iteration
app = self.getApp('127.0.0.1:10010 127.0.0.1:10011')
# TODO: test more connection failure cases
# Seventh packet : askNodeInformation succeeded
all_passed = []
def _ask8(_):
# askLastTransaction
def _ask9(_):
all_passed.append(1)
# Seventh packet : askNodeInformation succeeded
def _ask8(_):
pass
# Sixth packet : askPartitionTable succeeded
def _ask7(_):
app.pt = Mock({'operational': True})
......@@ -799,7 +797,7 @@ class ClientApplicationTests(NeoUnitTestBase):
def _ask1(_):
pass
ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask6, _ask7,
_ask8]
_ask8, _ask9]
def _ask_base(conn, _, handler=None):
ask_func_list.pop(0)(conn)
app.nm.getByAddress(conn.getAddress())._connection = None
......
......@@ -607,7 +607,6 @@ class Test(NEOThreadedTest):
client.tpc_begin(txn)
client.store(x2._p_oid, tid, x, '', txn)
tid = client.tpc_finish(txn, None)
client.close()
client.setPoll(0)
cluster.client.setPoll(1)
t1.begin() # make sure invalidation is processed
......@@ -619,6 +618,32 @@ class Test(NEOThreadedTest):
t.join()
self.assertEqual(x2.value, 1)
self.assertEqual(x1.value, 0)
def _flush_invalidations(orig):
l1.release()
l2.acquire()
orig()
x1._p_deactivate()
t1.abort()
p = Patch(c1, _flush_invalidations=_flush_invalidations)
try:
t = self.newThread(t1.begin)
l1.acquire()
cluster.client.setPoll(0)
client.setPoll(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x2._p_oid, tid, y, '', txn)
tid = client.tpc_finish(txn, None)
client.close()
client.setPoll(0)
cluster.client.setPoll(1)
finally:
del p
l2.release()
t.join()
self.assertEqual(x1.value, 1)
finally:
cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment