Commit 227953b9 authored by Jim Fulton's avatar Jim Fulton

Simplify MVCC by determining transaction start time using lastTransaction.

This implements: https://github.com/zopefoundation/ZODB/issues/50

Rather than watching invalidations, simply use 1 + the storages
lastTransaction, which is equivalent to but much simpler than waiting
for the first invalidation after a transaction starts.

More importantly, it means we can always use loadBefore and get away
from load.  We no longer have to worry about ordering of invalidations
and load() results.

Much thanks to NEO for pointing the way toward this simplification!

Implementing this initially caused a deadlock, because DB.open()
called Connection.open() while holding a database lock and
Connection.open() now calls IStotage.lastTransaction(), which acquires a
storage lock. (It's not clear that lastTransaction() really needs a
storage lock.)  Meanwhile, IStotage.tpc_finish() calls a DB function
that requires the DB lock while holding the storage lock.  Fixing this
required moving the call to Connection.open() outside the region where
the DB lock was held.

To debug the problem above, I greatly improved lock-debugging
support. Now all of the ZODB code imports Lock, RLock and Condition
from ZODB.utils. If the DEBUG_LOCKING is set to a non-empty value,
then these are wrapped in such a way that debugging information is
printed as they are used. This made spotting the nature of the
deadlock easier.

Of course, a change this basic broke lots of tests. Most of the
breakage arises from the fact that connections now call
lastTransaction on storages at transaction boundaries.  Lots of tests
didn't clean up databases and connections properly.  I fixed many
tests, but ultimately gave up and added some extra cleanup code that
violated transaction-manager underware (and the underware's privates)
to clear transaction synchonizers in test setup and tear-down.  I plan
to add a transaction manager API for this and to use it in a
subsequent PR.

This tests makes database and connection hygiene a bit more important,
especially for tests, because a connection will continue to interact
with storages if it isn't properly closed, which can lead to errors if
the storage is closed.  I chose not to swallow these errors in
Connection, choosing rather to clean up tests.

The thread debugging and test changes make this PR larger than I would
have liked. Apologies in advance to the reviewers.
parent 2ae41705
......@@ -12,12 +12,12 @@
#
##############################################################################
"""ZODB transfer activity monitoring
"""
$Id$"""
import threading
import time
from . import utils
class ActivityMonitor:
"""ZODB load/store activity monitor
......@@ -31,7 +31,7 @@ class ActivityMonitor:
def __init__(self, history_length=3600):
self.history_length = history_length # Number of seconds
self.log = [] # [(time, loads, stores)]
self.trim_lock = threading.Lock()
self.trim_lock = utils.Lock()
def closedConnection(self, conn):
log = self.log
......@@ -42,7 +42,7 @@ class ActivityMonitor:
def trim(self, now):
self.trim_lock.acquire()
log = self.log
cutoff = now - self.history_length
n = 0
......@@ -51,7 +51,7 @@ class ActivityMonitor:
n = n + 1
if n:
del log[:n]
self.trim_lock.release()
def setHistoryLength(self, history_length):
......
......@@ -18,7 +18,6 @@ its use is not recommended. It's still here for historical reasons.
"""
from __future__ import print_function
import threading
import time
import logging
import sys
......@@ -28,10 +27,10 @@ import zope.interface
from persistent.TimeStamp import TimeStamp
import ZODB.interfaces
from ZODB import POSException
from ZODB.utils import z64, oid_repr, byte_ord, byte_chr
from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB._compat import dumps, _protocol, py2_hasattr
from . import POSException, utils
from .utils import z64, oid_repr, byte_ord, byte_chr
from .UndoLogCompatible import UndoLogCompatible
from ._compat import dumps, _protocol, py2_hasattr
log = logging.getLogger("ZODB.BaseStorage")
......@@ -85,8 +84,8 @@ class BaseStorage(UndoLogCompatible):
log.debug("create storage %s", self.__name__)
# Allocate locks:
self._lock = threading.RLock()
self.__commit_lock = threading.Lock()
self._lock = utils.RLock()
self.__commit_lock = utils.Lock()
# Comment out the following 4 lines to debug locking:
self._lock_acquire = self._lock.acquire
......@@ -108,45 +107,6 @@ class BaseStorage(UndoLogCompatible):
else:
self._oid = oid
########################################################################
# The following methods are normally overridden on instances,
# except when debugging:
def _lock_acquire(self, *args):
f = sys._getframe(1)
sys.stdout.write("[la(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self._lock.acquire(*args)
sys.stdout.write("la(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
def _lock_release(self, *args):
f = sys._getframe(1)
sys.stdout.write("[lr(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self._lock.release(*args)
sys.stdout.write("lr(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
def _commit_lock_acquire(self, *args):
f = sys._getframe(1)
sys.stdout.write("[ca(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self.__commit_lock.acquire(*args)
sys.stdout.write("ca(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
def _commit_lock_release(self, *args):
f = sys._getframe(1)
sys.stdout.write("[cr(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self.__commit_lock.release(*args)
sys.stdout.write("cr(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
#
########################################################################
def sortKey(self):
"""Return a string that can be used to sort storage instances.
......
This diff is collapsed.
......@@ -13,13 +13,15 @@
##############################################################################
"""Database objects
"""
from __future__ import print_function
import sys
import threading
import logging
import datetime
import time
import warnings
from . import utils
from ZODB.broken import find_global
from ZODB.utils import z64
from ZODB.Connection import Connection
......@@ -179,6 +181,7 @@ class ConnectionPool(AbstractConnectionPool):
(available and available[0][0] < threshhold)
):
t, c = available.pop(0)
assert not c.opened
self.all.remove(c)
c._release_resources()
......@@ -213,6 +216,7 @@ class ConnectionPool(AbstractConnectionPool):
to_remove = ()
for (t, c) in self.available:
assert not c.opened
if t < threshhold:
to_remove += (c,)
self.all.remove(c)
......@@ -405,7 +409,7 @@ class DB(object):
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
# Allocate lock.
x = threading.RLock()
x = utils.RLock()
self._a = x.acquire
self._r = x.release
......@@ -559,15 +563,17 @@ class DB(object):
# sys.getrefcount(ob) returns. But, in addition to that,
# the cache holds an extra reference on non-ghost objects,
# and we also want to pretend that doesn't exist.
# If we have no way to get a refcount, we return False to symbolize
# that. As opposed to None, this has the advantage of being usable
# as a number (0) in case clients depended on that.
# If we have no way to get a refcount, we return False
# to symbolize that. As opposed to None, this has the
# advantage of being usable as a number (0) in case
# clients depended on that.
detail.append({
'conn_no': cn,
'oid': oid,
'id': id,
'klass': "%s%s" % (module, ob.__class__.__name__),
'rc': rc(ob) - 3 - (ob._p_changed is not None) if rc else False,
'rc': (rc(ob) - 3 - (ob._p_changed is not None)
if rc else False),
'state': ob._p_changed,
#'references': con.references(oid),
})
......@@ -628,7 +634,11 @@ class DB(object):
@self._connectionMap
def _(c):
c.transaction_manager.abort()
if c.opened:
c.transaction_manager.abort()
# Note that this will modify out pool, but this is safe, because
# _connectionMap makes a list of the pool to iterate over
c.close()
c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources()
......@@ -753,18 +763,18 @@ class DB(object):
assert result is not None
# open the connection.
result.open(transaction_manager)
# A good time to do some cache cleanup.
# (note we already have the lock)
self.pool.availableGC()
self.historical_pool.availableGC()
return result
finally:
self._r()
result.open(transaction_manager)
return result
def connectionDebugInfo(self):
result = []
t = time.time()
......@@ -986,7 +996,7 @@ class ContextManager:
self.tm.abort()
self.conn.close()
resource_counter_lock = threading.Lock()
resource_counter_lock = utils.Lock()
resource_counter = 0
class TransactionalUndo(object):
......
......@@ -19,11 +19,11 @@ to be layered over a base database.
The base storage must not change.
"""
from __future__ import print_function
import os
import random
import weakref
import tempfile
import threading
import ZODB.BaseStorage
import ZODB.blob
import ZODB.interfaces
......@@ -71,7 +71,7 @@ class DemoStorage(object):
self._issued_oids = set()
self._stored_oids = set()
self._commit_lock = threading.Lock()
self._commit_lock = ZODB.utils.Lock()
self._transaction = None
if name is None:
......@@ -319,6 +319,7 @@ class DemoStorage(object):
@ZODB.utils.locked
def tpc_abort(self, transaction):
if transaction is not self._transaction:
print('WTF', transaction, self._transaction)
return
self._stored_oids = set()
self._transaction = None
......
......@@ -20,7 +20,6 @@ import contextlib
import errno
import logging
import os
import threading
import time
from struct import pack
from struct import unpack
......@@ -32,6 +31,8 @@ from zc.lockfile import LockFile
from zope.interface import alsoProvides
from zope.interface import implementer
from .. import utils
from ZODB.blob import BlobStorageMixin
from ZODB.blob import link_or_copy
from ZODB.blob import remove_committed
......@@ -2046,7 +2047,7 @@ class FilePool:
self.name = file_name
self._files = []
self._out = []
self._cond = threading.Condition()
self._cond = utils.Condition()
@contextlib.contextmanager
def write_lock(self):
......
......@@ -19,7 +19,6 @@ storage without distracting storage details.
import BTrees
import time
import threading
import ZODB.BaseStorage
import ZODB.interfaces
import ZODB.POSException
......@@ -40,10 +39,10 @@ class MappingStorage(object):
self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
self._ltid = ZODB.utils.z64
self._last_pack = None
_lock = threading.RLock()
_lock = ZODB.utils.RLock()
self._lock_acquire = _lock.acquire
self._lock_release = _lock.release
self._commit_lock = threading.Lock()
self._commit_lock = ZODB.utils.Lock()
self._opened = True
self._transaction = None
self._oid = 0
......
......@@ -29,6 +29,8 @@ import transaction
import zope.interface
import zope.interface.verify
from .. import utils
ZERO = b'\0'*8
class BasicStorage:
......@@ -345,7 +347,7 @@ class BasicStorage:
results = {}
started.wait()
attempts = []
attempts_cond = threading.Condition()
attempts_cond = utils.Condition()
def update_attempts():
with attempts_cond:
......
......@@ -65,6 +65,7 @@ class ZODBClientThread(TestThread):
for i in range(self.commits):
self.commit(d, i)
self.test.assertEqual(sorted(d.keys()), list(range(self.commits)))
conn.close()
def commit(self, d, num):
d[num] = time.time()
......
......@@ -130,3 +130,10 @@ class MVCCMappingStorage(MappingStorage):
MappingStorage.pack(self, t, referencesf, gc)
finally:
self._commit_lock.release()
@ZODB.utils.locked(MappingStorage.opened)
def lastTransaction(self):
if self._transactions:
return self._transactions.maxKey()
else:
return ZODB.utils.z64
......@@ -25,7 +25,8 @@ from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.MTStorage import TestThread
from ZODB.tests.StorageTestBase import snooze
from ZODB._compat import loads, PersistentPickler, Pickler, Unpickler, BytesIO, _protocol
from ZODB._compat import (loads, PersistentPickler, Pickler, Unpickler,
BytesIO, _protocol)
import transaction
import ZODB.interfaces
import ZODB.tests.util
......@@ -270,6 +271,8 @@ class PackableStorage(PackableStorageBase):
self._sanity_check()
db.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=False)
......@@ -312,6 +315,8 @@ class PackableStorage(PackableStorageBase):
self._sanity_check()
db.close()
def checkPackWithMultiDatabaseReferences(self):
databases = {}
db = DB(self._storage, databases=databases, database_name='')
......@@ -327,6 +332,9 @@ class PackableStorage(PackableStorageBase):
db.pack(time.time()+1)
# some valid storages always return 0 for len()
self.assertTrue(len(self._storage) in (0, 1))
conn.close()
otherdb.close()
db.close()
def checkPackAllRevisions(self):
self._initroot()
......@@ -718,7 +726,7 @@ class ClientThread(TestThread):
def __init__(self, db, choices, loop_trip, timer, thread_id):
TestThread.__init__(self)
self.root = db.open().root()
self.db = db
self.choices = choices
self.loop_trip = loop_trip
self.millis = timer.elapsed_millis
......@@ -737,6 +745,8 @@ class ClientThread(TestThread):
def runtest(self):
from random import choice
conn = self.db.open()
root = conn.root()
for j in range(self.loop_trip):
assign_worked = False
......@@ -745,7 +755,7 @@ class ClientThread(TestThread):
try:
index = choice(self.choices)
alist.extend([self.millis(), index])
self.root[index].value = MinPO(j)
root[index].value = MinPO(j)
assign_worked = True
transaction.commit()
alist.append(self.millis())
......@@ -756,6 +766,8 @@ class ClientThread(TestThread):
transaction.abort()
alist.append(assign_worked)
conn.close()
class ElapsedTimer:
def __init__(self, start_time):
self.start_time = start_time
......@@ -776,5 +788,5 @@ def IExternalGC_suite(factory):
return doctest.DocFileSuite(
'IExternalGC.test',
setUp=setup, tearDown=zope.testing.setupstack.tearDown,
setUp=setup, tearDown=ZODB.tests.util.tearDown,
checker=ZODB.tests.util.checker)
......@@ -23,7 +23,7 @@ import sys
import time
import transaction
from ZODB.utils import u64
from ZODB.utils import u64, z64
from ZODB.tests.MinPO import MinPO
from ZODB._compat import PersistentPickler, Unpickler, BytesIO, _protocol
import ZODB.tests.util
......@@ -153,8 +153,8 @@ class StorageTestBase(ZODB.tests.util.TestCase):
self._storage.close()
def tearDown(self):
self._close()
ZODB.tests.util.TestCase.tearDown(self)
self._close()
def _dostore(self, oid=None, revid=None, data=None,
already_pickled=0, user=None, description=None):
......
......@@ -130,4 +130,4 @@ revision as well as the entire directory:
Clean up our blob directory and database:
>>> blob_storage.close()
>>> database.close()
......@@ -49,3 +49,5 @@ writing and expect the file to be in the blob temporary directory::
True
>>> w.close()
>>> database.close()
......@@ -160,3 +160,5 @@ knowledge that the underlying storage's pack method is also called:
>>> blob_storage._blobs_pack_is_in_progress
False
>>> base_storage.pack = base_pack
>>> database.close()
......@@ -61,7 +61,7 @@ While it's boring, it's important to verify that the same relationships
hold if the default pool size is overridden.
>>> handler.clear()
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> PS = 2 # smaller pool size
>>> db = DB(st, pool_size=PS)
......@@ -117,7 +117,7 @@ We can change the pool size on the fly:
Enough of that.
>>> handler.clear()
>>> st.close()
>>> db.close()
More interesting is the stack-like nature of connection reuse. So long as
we keep opening new connections, and keep them alive, all connections
......@@ -256,7 +256,7 @@ Nothing in that last block should have logged any msgs:
If "too many" connections are open, then closing one may kick an older
closed one out of the available connection stack.
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> db = DB(st, pool_size=3)
>>> conns = [db.open() for dummy in range(6)]
......@@ -324,7 +324,7 @@ gc to reclaim the Connection and its cache eventually works, but that can
take "a long time" and caches can hold on to many objects, and limited
resources (like RDB connections), for the duration.
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> db = DB(st, pool_size=2)
>>> conn0 = db.open()
......
......@@ -25,10 +25,11 @@ Make a change locally:
>>> rt = cn.root()
>>> rt['a'] = 1
Sync should not have been called yet.
Sync is called when a connection is open, as that starts a new transaction:
>>> st.sync_called # False before 3.4
False
>>> st.sync_called
True
>>> st.sync_called = False
``sync()`` is called by the Connection's ``afterCompletion()`` hook after the
......@@ -77,7 +78,9 @@ path at some point when serving pages.
>>> rt = cn.root() # make a change
>>> rt['c'] = 3
>>> st.sync_called
False
True
>>> st.sync_called = False
Now ensure that ``cn.afterCompletion() -> st.sync()`` gets called by commit
despite that the `Connection` registered after the transaction began:
......@@ -96,7 +99,8 @@ And try the same thing with a non-threaded transaction manager:
>>> rt = cn.root() # make a change
>>> rt['d'] = 4
>>> st.sync_called
False
True
>>> st.sync_called = False
>>> tm.commit()
>>> st.sync_called
True
......
......@@ -22,7 +22,7 @@ import unittest
import transaction
import ZODB.tests.util
from ZODB.config import databaseFromString
from ZODB.utils import p64
from ZODB.utils import p64, u64, z64
from persistent import Persistent
from zope.interface.verify import verifyObject
from zope.testing import loggingsupport, renormalizing
......@@ -522,16 +522,24 @@ class InvalidationTests(unittest.TestCase):
create one from an int.
>>> cn.invalidate(p64(1), {p1._p_oid: 1})
>>> cn._txn_time
'\x00\x00\x00\x00\x00\x00\x00\x01'
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
>>> cn._txn_time == p64(u64(db.storage.lastTransaction()) + 1)
True
>>> p1._p_oid in cn._invalidated
True
>>> p2._p_oid in cn._invalidated
False
>>> cn.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
>>> cn._txn_time
'\x00\x00\x00\x00\x00\x00\x00\x01'
>>> cn._txn_time == p64(u64(db.storage.lastTransaction()) + 1)
True
>>> p1._p_oid in cn._invalidated
True
>>> p2._p_oid in cn._invalidated
......@@ -560,6 +568,7 @@ class InvalidationTests(unittest.TestCase):
>>> cn._invalidated
set([])
>>> db.close()
"""
def doctest_invalidateCache():
......@@ -1289,6 +1298,9 @@ class StubStorage:
raise TypeError('StubStorage does not support versions.')
return self._data[oid]
def loadBefore(self, oid, tid):
return self._data[oid] + (None, )
def store(self, oid, serial, p, version, transaction):
if version != '':
raise TypeError('StubStorage does not support versions.')
......@@ -1304,6 +1316,9 @@ class StubStorage:
# storage
return None
def lastTransaction(self):
return z64
class TestConnectionInterface(unittest.TestCase):
......
......@@ -125,7 +125,7 @@ def connectionDebugInfo():
... now += .1
... return now
>>> real_time = time.time
>>> if isinstance(time,type):
>>> if isinstance(time, type):
... time.time = staticmethod(faux_time) # Jython
... else:
... time.time = faux_time
......@@ -151,7 +151,7 @@ def connectionDebugInfo():
>>> before
[None, '\x03zY\xd8\xc0m9\xdd', None]
>>> opened
['2008-12-04T20:40:44Z (1.40s)', '2008-12-04T20:40:45Z (0.30s)', None]
['2008-12-04T20:40:44Z (1.30s)', '2008-12-04T20:40:46Z (0.10s)', None]
>>> infos
['test info (2)', ' (0)', ' (0)']
......
......@@ -75,12 +75,12 @@ class DemoStorageTests(
db = DB(self._storage) # creates object 0. :)
self.assertEqual(len(self._storage), 1)
self.assertTrue(self._storage)
conn = db.open()
for i in range(10):
conn.root()[i] = conn.root().__class__()
transaction.commit()
with db.transaction() as conn:
for i in range(10):
conn.root()[i] = conn.root().__class__()
self.assertEqual(len(self._storage), 11)
self.assertTrue(self._storage)
db.close()
def checkLoadBeforeUndo(self):
pass # we don't support undo yet
......
......@@ -35,6 +35,7 @@ from ZODB.tests import ReadOnlyStorage, RecoveryStorage
from ZODB.tests.StorageTestBase import MinPO, zodb_pickle
from ZODB._compat import dump, dumps, _protocol
from . import util
class FileStorageTests(
StorageTestBase.StorageTestBase,
......@@ -696,7 +697,7 @@ def test_suite():
suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown,
tearDown=util.tearDown,
checker=ZODB.tests.util.checker))
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'BlobFileStorage',
......
......@@ -33,6 +33,7 @@ from ZODB.tests import (
)
class MVCCTests:
def checkClosingNestedDatabasesWorks(self):
# This tests for the error described in
# https://github.com/zopefoundation/ZODB/issues/45
......@@ -42,7 +43,6 @@ class MVCCTests:
db1.close()
db2.close()
def checkCrossConnectionInvalidation(self):
# Verify connections see updated state at txn boundaries.
# This will fail if the Connection doesn't poll for changes.
......
......@@ -38,6 +38,8 @@ __test__ = dict(
>>> list(conn2.root()[0].keys())
[]
>>> db2.close()
>>> db1.close()
""",
)
......
......@@ -64,7 +64,7 @@ Now we see two transactions and two changed objects.
Clean up.
>>> st.close()
>>> db.close()
"""
import re
......@@ -87,6 +87,6 @@ checker = renormalizing.RENormalizing([
def test_suite():
return doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown,
tearDown=ZODB.tests.util.tearDown,
optionflags=doctest.REPORT_NDIFF,
checker=ZODB.tests.util.checker + checker)
......@@ -99,10 +99,13 @@ class MinimalMemoryStorage(BaseStorage, object):
del self._txn
def _finish(self, tid, u, d, e):
with self._lock:
self._lock_acquire()
try:
self._index.update(self._txn.index)
self._cur.update(self._txn.cur())
self._ltid = self._tid
finally:
self._lock_release()
def loadBefore(self, the_oid, the_tid):
# It's okay if loadBefore() is really expensive, because this
......@@ -121,6 +124,9 @@ class MinimalMemoryStorage(BaseStorage, object):
end_tid = None
else:
end_tid = tids[j]
self.hook(the_oid, self._cur[the_oid], '')
return self._index[(the_oid, tid)], tid, end_tid
def loadSerial(self, oid, serial):
......
......@@ -54,6 +54,8 @@ except NameError:
import io
file_type = io.BufferedReader
from . import util
def new_time():
"""Create a _new_ time stamp.
......@@ -334,6 +336,7 @@ class RecoveryBlobStorage(BlobTestBase,
transaction.commit()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
db.close()
def gc_blob_removes_uncommitted_data():
......@@ -446,7 +449,6 @@ def packing_with_uncommitted_data_non_undoing():
Clean up:
>>> database.close()
"""