Commit 5c8e487a authored by Jim Fulton's avatar Jim Fulton

Fixed: ZEO needed changes to work with recent transaction changes.

parent c3183420
Changelog
=========
- Fixed: ZEO needed changes to work with recent transaction changes.
5.0.2 (2016-11-02)
------------------
......
......@@ -26,9 +26,9 @@ if (3, 0) < sys.version_info < (3, 4):
sys.exit(1)
install_requires = [
'ZODB >= 5.0.0a5',
'ZODB >= 5.1.0',
'six',
'transaction >= 1.6.0',
'transaction >= 2.0.3',
'persistent >= 4.1.0',
'zc.lockfile',
'ZConfig',
......
......@@ -474,11 +474,8 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
raise POSException.ReadOnlyError()
try:
buf = trans.data(self)
except KeyError:
buf = None
if buf is None:
buf = trans.__buf
except AttributeError:
raise POSException.StorageTransactionError(
"Transaction not committing", meth, trans)
......@@ -799,40 +796,35 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
raise POSException.ReadOnlyError()
try:
tbuf = txn.data(self)
tbuf = txn.__buf
except AttributeError:
# Gaaaa. This is a recovery transaction. Work around this
# until we can think of something better. XXX
tb = {}
txn.data = tb.__getitem__
txn.set_data = tb.__setitem__
except KeyError:
pass
txn.__buf = tbuf = TransactionBuffer(self._connection_generation)
else:
if tbuf is not None:
raise POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
raise POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
txn.set_data(self, TransactionBuffer(self._connection_generation))
# XXX we'd like to allow multiple transactions at a time at some point,
# but for now, due to server limitations, TCBOO.
self._commit_lock.acquire()
self._tbuf = txn.data(self)
self._tbuf = tbuf
try:
self._async(
'tpc_begin', id(txn),
txn.user, txn.description, txn._extension, tid, status)
txn.user, txn.description, txn.extension, tid, status)
except ClientDisconnected:
self.tpc_end(txn)
raise
def tpc_end(self, txn):
tbuf = txn.data(self)
if tbuf is not None:
try:
tbuf = txn.__buf
except AttributeError:
pass
else:
del txn.__buf
tbuf.close()
txn.set_data(self, None)
self._commit_lock.release()
def lastTransaction(self):
......@@ -845,8 +837,8 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
they normally would.)
"""
try:
tbuf = txn.data(self)
except KeyError:
tbuf = txn.__buf
except AttributeError:
return
try:
......
......@@ -28,7 +28,6 @@ import sys
import tempfile
import threading
import time
import transaction
import warnings
import ZEO.asyncio.server
import ZODB.blob
......@@ -42,6 +41,7 @@ from ZEO._compat import Pickler, Unpickler, PY3, BytesIO
from ZEO.Exceptions import AuthError
from ZEO.monitor import StorageStats
from ZEO.asyncio.server import Delay, MTDelay, Result
from ZODB.Connection import TransactionMetaData
from ZODB.loglevels import BLATHER
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
......@@ -308,11 +308,8 @@ class ZEOStorage:
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
t = transaction.Transaction()
t = TransactionMetaData(user, description, ext)
t.id = id
t.user = user
t.description = description
t._extension = ext
self.serials = []
self.conflicts = {}
......
......@@ -18,7 +18,7 @@ from __future__ import print_function
import sys
import time
from ZODB.Transaction import Transaction
from ZODB.Connection import TransactionMetaData
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle
from ZEO.ClientStorage import ClientStorage
......@@ -57,7 +57,7 @@ def main():
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t = TransactionMetaData()
t.user = "timeout.py"
storage.tpc_begin(t)
storage.store(oid, revid, pickled_data, '', t)
......
......@@ -48,7 +48,7 @@ from ZODB.FileStorage import FileStorage
#from BDBStorage.BDBFullStorage import BDBFullStorage
#from Standby.primary import PrimaryStorage
#from Standby.config import RS_PORT
from ZODB.Transaction import Transaction
from ZODB.Connection import TransactionMetaData
from ZODB.utils import p64
from functools import reduce
......@@ -149,7 +149,7 @@ class ReplayTxn(TxnStat):
def replay(self):
ZERO = '\0'*8
t0 = now()
t = Transaction()
t = TransactionMetaData()
self._storage.tpc_begin(t)
for obj in self._objects:
oid = obj.oid
......
......@@ -13,11 +13,10 @@
##############################################################################
"""Tests of the ZEO cache"""
from ZODB.Connection import TransactionMetaData
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
from transaction import Transaction
class TransUndoStorageWithCache:
def checkUndoInvalidation(self):
......@@ -35,7 +34,7 @@ class TransUndoStorageWithCache:
tid = info[0]['id']
# Now start an undo transaction
t = Transaction()
t = TransactionMetaData()
t.note('undo1')
oids = self._begin_undos_vote(t, tid)
......
......@@ -17,7 +17,7 @@ import threading
import time
from persistent.TimeStamp import TimeStamp
import transaction
from ZODB.Connection import TransactionMetaData
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
......@@ -89,7 +89,7 @@ class CommitLockTests:
self._storages = []
def _start_txn(self):
txn = transaction.Transaction()
txn = TransactionMetaData()
self._storage.tpc_begin(txn)
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
......@@ -104,7 +104,7 @@ class CommitLockTests:
for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client()
txn = transaction.Transaction()
txn = TransactionMetaData()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
......
......@@ -24,6 +24,7 @@ from ZEO.Exceptions import ClientDisconnected
from ZEO.asyncio.marshal import encode
from ZEO.tests import forker
from ZODB.Connection import TransactionMetaData
from ZODB.DB import DB
from ZODB.POSException import ReadOnlyError, ConflictError
from ZODB.tests.StorageTestBase import StorageTestBase
......@@ -32,7 +33,6 @@ from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
import ZODB.tests.util
import transaction
from transaction import Transaction
from . import testssl
......@@ -863,7 +863,7 @@ class ReconnectionTests(CommonSetupTearDown):
self._storage = self.openClientStorage()
self._dostore()
oids = [self._storage.new_oid() for i in range(5)]
txn = Transaction()
txn = TransactionMetaData()
self._storage.tpc_begin(txn)
for oid in oids:
data = zodb_pickle(MinPO(oid))
......@@ -957,7 +957,7 @@ class TimeoutTests(CommonSetupTearDown):
def checkTimeout(self):
self._storage = storage = self.openClientStorage()
txn = Transaction()
txn = TransactionMetaData()
storage.tpc_begin(txn)
storage.tpc_vote(txn)
time.sleep(2)
......@@ -977,7 +977,7 @@ class TimeoutTests(CommonSetupTearDown):
def checkTimeoutOnAbort(self):
storage = self.openClientStorage()
txn = Transaction()
txn = TransactionMetaData()
storage.tpc_begin(txn)
storage.tpc_vote(txn)
storage.tpc_abort(txn)
......@@ -985,7 +985,7 @@ class TimeoutTests(CommonSetupTearDown):
def checkTimeoutOnAbortNoLock(self):
storage = self.openClientStorage()
txn = Transaction()
txn = TransactionMetaData()
storage.tpc_begin(txn)
storage.tpc_abort(txn)
storage.close()
......@@ -998,7 +998,7 @@ class TimeoutTests(CommonSetupTearDown):
oid = storage.new_oid()
obj = MinPO(7)
# Now do a store, sleeping before the finish so as to cause a timeout
t = Transaction()
t = TransactionMetaData()
old_connection_count = storage.connection_count_for_tests
storage.tpc_begin(t)
revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
......@@ -1051,7 +1051,7 @@ class MSTThread(threading.Thread):
c.__serials = {}
# Begin a transaction
t = Transaction()
t = TransactionMetaData()
for c in clients:
#print("%s.%s.%s begin" % (tname, c.__name, i))
c.tpc_begin(t)
......
......@@ -17,6 +17,8 @@ import transaction
import six
import gc
from ZODB.Connection import TransactionMetaData
from ..asyncio.testing import AsyncRPC
class IterationTests:
......@@ -28,14 +30,16 @@ class IterationTests:
# First, confirm that it ran
self.assertTrue(self._storage._iterators._last_gc > 0)
gc_enabled = gc.isenabled()
gc.disable() # make sure there's no race conditions cleaning out the weak refs
# make sure there's no race conditions cleaning out the weak refs
gc.disable()
try:
self.assertEquals(0, len(self._storage._iterator_ids))
except AssertionError:
# Ok, we have ids. That should also mean that the
# weak dictionary has the same length.
self.assertEqual(len(self._storage._iterators), len(self._storage._iterator_ids))
self.assertEqual(len(self._storage._iterators),
len(self._storage._iterator_ids))
# Now if we do a collection and re-ask for iterator_gc
# everything goes away as expected.
gc.enable()
......@@ -44,7 +48,8 @@ class IterationTests:
self._storage._iterator_gc()
self.assertEqual(len(self._storage._iterators), len(self._storage._iterator_ids))
self.assertEqual(len(self._storage._iterators),
len(self._storage._iterator_ids))
self.assertEquals(0, len(self._storage._iterator_ids))
finally:
if gc_enabled:
......@@ -126,7 +131,7 @@ class IterationTests:
iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction()
t = TransactionMetaData()
self._storage._iterators._last_gc = -1
self._storage.tpc_begin(t)
self._storage.tpc_abort(t)
......@@ -143,7 +148,7 @@ class IterationTests:
six.advance_iterator(self._storage.iterator())
iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction()
t = TransactionMetaData()
self._storage.tpc_begin(t)
# Show that after disconnecting, the client side GCs the iterators
# as well. I'm calling this directly to avoid accidentally
......
......@@ -15,7 +15,7 @@
import threading
import transaction
from ZODB.Connection import TransactionMetaData
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.Exceptions
......@@ -24,7 +24,7 @@ ZERO = '\0'*8
class BasicThread(threading.Thread):
def __init__(self, storage, doNextEvent, threadStartedEvent):
self.storage = storage
self.trans = transaction.Transaction()
self.trans = TransactionMetaData()
self.doNextEvent = doNextEvent
self.threadStartedEvent = threadStartedEvent
self.gotValueError = 0
......
......@@ -28,7 +28,6 @@ import sys
import tempfile
import threading
import time
import transaction
import warnings
from .zrpc.error import DisconnectedError
import ZODB.blob
......@@ -43,6 +42,7 @@ from ZEO.Exceptions import AuthError
from .monitor import StorageStats, StatsServer
from .zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
from .zrpc.server import Dispatcher
from ZODB.Connection import TransactionMetaData
from ZODB.loglevels import BLATHER
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
......@@ -375,11 +375,8 @@ class ZEOStorage:
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
t = transaction.Transaction()
t = TransactionMetaData(user, description, ext)
t.id = id
t.user = user
t.description = description
t._extension = ext
self.serials = []
self.invalidated = []
......
......@@ -20,6 +20,8 @@ from ZEO.ClientStorage import ClientStorage
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
from ZEO.tests import IterationTests
from ZEO._compat import PY3
from ZODB.Connection import TransactionMetaData
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, \
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
......@@ -551,7 +553,7 @@ class CommonBlobTests:
data = zodb_pickle(blob)
self.assert_(os.path.exists(tfname))
t = transaction.Transaction()
t = TransactionMetaData()
try:
self._storage.tpc_begin(t)
self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
......@@ -590,7 +592,7 @@ class CommonBlobTests:
oid = self._storage.new_oid()
data = zodb_pickle(blob)
t = transaction.Transaction()
t = TransactionMetaData()
try:
self._storage.tpc_begin(t)
self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
......@@ -614,7 +616,7 @@ class CommonBlobTests:
oid = self._storage.new_oid()
with open('blob_file', 'wb') as f:
f.write(b'I am a happy blob.')
t = transaction.Transaction()
t = TransactionMetaData()
self._storage.tpc_begin(t)
self._storage.storeBlob(
oid, ZODB.utils.z64, 'foo', 'blob_file', '', t)
......@@ -655,7 +657,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
data = zodb_pickle(blob)
self.assert_(os.path.exists(tfname))
t = transaction.Transaction()
t = TransactionMetaData()
try:
self._storage.tpc_begin(t)
self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
......@@ -918,24 +920,25 @@ def tpc_finish_error():
>>> conn = db.open()
>>> conn.root.x = 1
>>> t = conn.transaction_manager.get()
>>> conn._storage.tpc_begin(t)
>>> conn.tpc_begin(t)
>>> conn.commit(t)
>>> _ = client.tpc_vote(t)
>>> transaction_meta_data = t.data(conn)
>>> _ = client.tpc_vote(transaction_meta_data)
Cause some breakage by messing with the clients transaction
buffer, sadly, using implementation details:
>>> tbuf = t.data(client)
>>> tbuf = client._check_trans(transaction_meta_data, 'test')
>>> tbuf.client_resolved = None
tpc_finish will fail:
>>> client.tpc_finish(t) # doctest: +ELLIPSIS
>>> client.tpc_finish(transaction_meta_data) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
AttributeError: ...
>>> client.tpc_abort(t)
>>> client.tpc_abort(transaction_meta_data)
>>> t.abort()
But we can still load the saved data:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment