Commit c0138cbd authored by Jim Fulton's avatar Jim Fulton

Switched to using threading.local for the thread-based transaction

manager. This fixed:

When threads were reused, transaction data could leak accross them,
  causing subtle application bugs.

https://bugs.launchpad.net/zodb/+bug/239086
parent 97282329
......@@ -28,12 +28,16 @@ New Features:
Bugs fixed:
- Fixed a bug that caused extra commit calls to be made on data
managers under certain special circumstances.
https://mail.zope.org/pipermail/zodb-dev/2010-May/013329.html
- When threads were reused, transaction data could leak accross them,
causing subtle application bugs.
https://bugs.launchpad.net/zodb/+bug/239086
1.0.1 (2010-05-07)
------------------
......
......@@ -21,15 +21,7 @@ from transaction.weakset import WeakSet
from transaction._transaction import Transaction
from transaction.interfaces import TransientError
import thread
# Used for deprecated arguments. ZODB.utils.DEPRECATED_ARGUMENT was
# too hard to use here, due to the convoluted import dance across
# __init__.py files.
_marker = object()
import threading
# We have to remember sets of synch objects, especially Connections.
# But we don't want mere registration with a transaction manager to
......@@ -38,7 +30,6 @@ _marker = object()
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
# Therefore we use "weak sets" internally.
#
# Call the ISynchronizer newTransaction() method on every element of
# WeakSet synchs.
......@@ -58,7 +49,6 @@ def _new_transaction(txn, synchs):
# so that Transactions "see" synchronizers that get registered after the
# Transaction object is constructed.
class TransactionManager(object):
def __init__(self):
......@@ -129,62 +119,12 @@ class TransactionManager(object):
return True
class ThreadTransactionManager(TransactionManager):
class ThreadTransactionManager(TransactionManager, threading.local):
"""Thread-aware transaction manager.
Each thread is associated with a unique transaction.
"""
def __init__(self):
# _threads maps thread ids to transactions
self._txns = {}
# _synchs maps a thread id to a WeakSet of registered synchronizers.
# The WeakSet is passed to the Transaction constructor, because the
# latter needs to call the synchronizers when it commits.
self._synchs = {}
def begin(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is not None:
txn.abort()
synchs = self._synchs.get(tid)
if synchs is None:
synchs = self._synchs[tid] = WeakSet()
txn = self._txns[tid] = Transaction(synchs, self)
_new_transaction(txn, synchs)
return txn
def get(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is None:
synchs = self._synchs.get(tid)
if synchs is None:
synchs = self._synchs[tid] = WeakSet()
txn = self._txns[tid] = Transaction(synchs, self)
return txn
def free(self, txn):
tid = thread.get_ident()
assert txn is self._txns.get(tid)
del self._txns[tid]
def registerSynch(self, synch):
tid = thread.get_ident()
ws = self._synchs.get(tid)
if ws is None:
ws = self._synchs[tid] = WeakSet()
ws.add(synch)
def unregisterSynch(self, synch):
tid = thread.get_ident()
ws = self._synchs[tid]
ws.remove(synch)
class Attempt(object):
def __init__(self, manager):
......
......@@ -673,8 +673,8 @@ def test_addAfterCommitHook():
>>> do = DataObject(mgr)
>>> t = transaction.begin()
>>> len(t._manager._txns)
1
>>> t._manager._txn is not None
True
>>> t.addAfterCommitHook(hook, ('-', 1))
>>> transaction.commit()
......@@ -682,12 +682,67 @@ def test_addAfterCommitHook():
>>> log
["True arg '-' kw1 1 kw2 'no_kw2'"]
>>> len(t._manager._txns)
0
>>> t._manager._txn is not None
False
>>> reset_log()
"""
def bug239086():
"""
The original implementation of thread transaction manager made
invalid assumptions about thread ids.
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm.keys()
[]
>>> class Sync:
... def __init__(self, label):
... self.label = label
... def beforeCompletion(self, t):
... print self.label, 'before'
... def afterCompletion(self, t):
... print self.label, 'after'
... def newTransaction(self, t):
... print self.label, 'new'
>>> sync = Sync(1)
>>> import threading
>>> def run_in_thread(f):
... t = threading.Thread(target=f)
... t.start()
... t.join()
>>> @run_in_thread
... def first():
... transaction.manager.registerSynch(sync)
... transaction.manager.begin()
... dm['a'] = 1
1 new
>>> @run_in_thread
... def second():
... transaction.abort() # should do nothing.
>>> dm.keys()
['a']
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm.keys()
[]
>>> @run_in_thread
... def first():
... dm['a'] = 1
>>> transaction.abort() # should do nothing
>>> dm.keys()
['a']
"""
def test_suite():
suite = unittest.TestSuite((
DocFileSuite('doom.txt'),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment