Commit d4066247 authored by Chris McDonough's avatar Chris McDonough

First cut; one test still fails.

parent 554e9589
1.0
Initial release.
Remove (deprecated) support for beforeCommitHook alias to
addBeforeCommitHook.
Add weakset tests.
Add TimeStamp tests.
Copyright (c) 2007 Zope Corporation and Contributors.
All Rights Reserved.
Zope Public License (ZPL) Version 2.1
-------------------------------------
A copyright notice accompanies this license document that
identifies the copyright holders.
This license has been certified as open source. It has also
been designated as GPL compatible by the Free Software
Foundation (FSF).
Redistribution and use in source and binary forms, with or
without modification, are permitted provided that the
following conditions are met:
1. Redistributions in source code must retain the
accompanying copyright notice, this list of conditions,
and the following disclaimer.
2. Redistributions in binary form must reproduce the accompanying
copyright notice, this list of conditions, and the
following disclaimer in the documentation and/or other
materials provided with the distribution.
3. Names of the copyright holders must not be used to
endorse or promote products derived from this software
without prior written permission from the copyright
holders.
4. The right to distribute this software or to use it for
any purpose does not give you the right to use
Servicemarks (sm) or Trademarks (tm) of the copyright
holders. Use of them is covered by separate agreement
with the copyright holders.
5. If any files are modified, you must cause the modified
files to carry prominent notices stating that you changed
the files and the date of any change.
Disclaimer
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS''
AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN
NO EVENT SHALL THE COPYRIGHT HOLDERS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
============
Transactions
============
This package contains a generic transaction implementation for Python. It is
mainly used by the ZODB, though.
Note that the data manager API, ``transaction.interfaces.IDataManager``,
is syntactically simple, but semantically complex. The semantics
were not easy to express in the interface. This could probably use
more work. The semantics are presented in detail through examples of
a sample data manager in ``transaction.tests.test_SampleDataManager``.
#!python
"""Bootstrap setuptools installation
If you want to use setuptools in your package's setup.py, just include this
file in the same directory with it, and add this to the top of your setup.py::
from ez_setup import use_setuptools
use_setuptools()
If you want to require a specific version of setuptools, set a download
mirror, or use an alternate download directory, you can do so by supplying
the appropriate options to ``use_setuptools()``.
This file can also be run as a script to install or upgrade setuptools.
"""
import sys
DEFAULT_VERSION = "0.6c7"
DEFAULT_URL = "http://pypi.python.org/packages/%s/s/setuptools/" % sys.version[:3]
md5_data = {
'setuptools-0.6b1-py2.3.egg': '8822caf901250d848b996b7f25c6e6ca',
'setuptools-0.6b1-py2.4.egg': 'b79a8a403e4502fbb85ee3f1941735cb',
'setuptools-0.6b2-py2.3.egg': '5657759d8a6d8fc44070a9d07272d99b',
'setuptools-0.6b2-py2.4.egg': '4996a8d169d2be661fa32a6e52e4f82a',
'setuptools-0.6b3-py2.3.egg': 'bb31c0fc7399a63579975cad9f5a0618',
'setuptools-0.6b3-py2.4.egg': '38a8c6b3d6ecd22247f179f7da669fac',
'setuptools-0.6b4-py2.3.egg': '62045a24ed4e1ebc77fe039aa4e6f7e5',
'setuptools-0.6b4-py2.4.egg': '4cb2a185d228dacffb2d17f103b3b1c4',
'setuptools-0.6c1-py2.3.egg': 'b3f2b5539d65cb7f74ad79127f1a908c',
'setuptools-0.6c1-py2.4.egg': 'b45adeda0667d2d2ffe14009364f2a4b',
'setuptools-0.6c2-py2.3.egg': 'f0064bf6aa2b7d0f3ba0b43f20817c27',
'setuptools-0.6c2-py2.4.egg': '616192eec35f47e8ea16cd6a122b7277',
'setuptools-0.6c3-py2.3.egg': 'f181fa125dfe85a259c9cd6f1d7b78fa',
'setuptools-0.6c3-py2.4.egg': 'e0ed74682c998bfb73bf803a50e7b71e',
'setuptools-0.6c3-py2.5.egg': 'abef16fdd61955514841c7c6bd98965e',
'setuptools-0.6c4-py2.3.egg': 'b0b9131acab32022bfac7f44c5d7971f',
'setuptools-0.6c4-py2.4.egg': '2a1f9656d4fbf3c97bf946c0a124e6e2',
'setuptools-0.6c4-py2.5.egg': '8f5a052e32cdb9c72bcf4b5526f28afc',
'setuptools-0.6c5-py2.3.egg': 'ee9fd80965da04f2f3e6b3576e9d8167',
'setuptools-0.6c5-py2.4.egg': 'afe2adf1c01701ee841761f5bcd8aa64',
'setuptools-0.6c5-py2.5.egg': 'a8d3f61494ccaa8714dfed37bccd3d5d',
'setuptools-0.6c6-py2.3.egg': '35686b78116a668847237b69d549ec20',
'setuptools-0.6c6-py2.4.egg': '3c56af57be3225019260a644430065ab',
'setuptools-0.6c6-py2.5.egg': 'b2f8a7520709a5b34f80946de5f02f53',
'setuptools-0.6c7-py2.3.egg': '209fdf9adc3a615e5115b725658e13e2',
'setuptools-0.6c7-py2.4.egg': '5a8f954807d46a0fb67cf1f26c55a82e',
'setuptools-0.6c7-py2.5.egg': '45d2ad28f9750e7434111fde831e8372',
}
import sys, os
def _validate_md5(egg_name, data):
if egg_name in md5_data:
from md5 import md5
digest = md5(data).hexdigest()
if digest != md5_data[egg_name]:
print >>sys.stderr, (
"md5 validation of %s failed! (Possible download problem?)"
% egg_name
)
sys.exit(2)
return data
def use_setuptools(
version=DEFAULT_VERSION, download_base=DEFAULT_URL, to_dir=os.curdir,
download_delay=15
):
"""Automatically find/download setuptools and make it available on sys.path
`version` should be a valid setuptools version number that is available
as an egg for download under the `download_base` URL (which should end with
a '/'). `to_dir` is the directory where setuptools will be downloaded, if
it is not already available. If `download_delay` is specified, it should
be the number of seconds that will be paused before initiating a download,
should one be required. If an older version of setuptools is installed,
this routine will print a message to ``sys.stderr`` and raise SystemExit in
an attempt to abort the calling script.
"""
try:
import setuptools
if setuptools.__version__ == '0.0.1':
print >>sys.stderr, (
"You have an obsolete version of setuptools installed. Please\n"
"remove it from your system entirely before rerunning this script."
)
sys.exit(2)
except ImportError:
egg = download_setuptools(version, download_base, to_dir, download_delay)
sys.path.insert(0, egg)
import setuptools; setuptools.bootstrap_install_from = egg
import pkg_resources
try:
pkg_resources.require("setuptools>="+version)
except pkg_resources.VersionConflict, e:
# XXX could we install in a subprocess here?
print >>sys.stderr, (
"The required version of setuptools (>=%s) is not available, and\n"
"can't be installed while this script is running. Please install\n"
" a more recent version first.\n\n(Currently using %r)"
) % (version, e.args[0])
sys.exit(2)
def download_setuptools(
version=DEFAULT_VERSION, download_base=DEFAULT_URL, to_dir=os.curdir,
delay = 15
):
"""Download setuptools from a specified location and return its filename
`version` should be a valid setuptools version number that is available
as an egg for download under the `download_base` URL (which should end
with a '/'). `to_dir` is the directory where the egg will be downloaded.
`delay` is the number of seconds to pause before an actual download attempt.
"""
import urllib2, shutil
egg_name = "setuptools-%s-py%s.egg" % (version,sys.version[:3])
url = download_base + egg_name
saveto = os.path.join(to_dir, egg_name)
src = dst = None
if not os.path.exists(saveto): # Avoid repeated downloads
try:
from distutils import log
if delay:
log.warn("""
---------------------------------------------------------------------------
This script requires setuptools version %s to run (even to display
help). I will attempt to download it for you (from
%s), but
you may need to enable firewall access for this script first.
I will start the download in %d seconds.
(Note: if this machine does not have network access, please obtain the file
%s
and place it in this directory before rerunning this script.)
---------------------------------------------------------------------------""",
version, download_base, delay, url
); from time import sleep; sleep(delay)
log.warn("Downloading %s", url)
src = urllib2.urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = _validate_md5(egg_name, src.read())
dst = open(saveto,"wb"); dst.write(data)
finally:
if src: src.close()
if dst: dst.close()
return os.path.realpath(saveto)
def main(argv, version=DEFAULT_VERSION):
"""Install or upgrade setuptools and EasyInstall"""
try:
import setuptools
except ImportError:
egg = None
try:
egg = download_setuptools(version, delay=0)
sys.path.insert(0,egg)
from setuptools.command.easy_install import main
return main(list(argv)+[egg]) # we're done here
finally:
if egg and os.path.exists(egg):
os.unlink(egg)
else:
if setuptools.__version__ == '0.0.1':
# tell the user to uninstall obsolete version
use_setuptools(version)
req = "setuptools>="+version
import pkg_resources
try:
pkg_resources.require(req)
except pkg_resources.VersionConflict:
try:
from setuptools.command.easy_install import main
except ImportError:
from easy_install import main
main(list(argv)+[download_setuptools(delay=0)])
sys.exit(0) # try to force an exit
else:
if argv:
from setuptools.command.easy_install import main
main(argv)
else:
print "Setuptools version",version,"or greater has been installed."
print '(Run "ez_setup.py -U setuptools" to reinstall or upgrade.)'
def update_md5(filenames):
"""Update our built-in md5 registry"""
import re
from md5 import md5
for name in filenames:
base = os.path.basename(name)
f = open(name,'rb')
md5_data[base] = md5(f.read()).hexdigest()
f.close()
data = [" %r: %r,\n" % it for it in md5_data.items()]
data.sort()
repl = "".join(data)
import inspect
srcfile = inspect.getsourcefile(sys.modules[__name__])
f = open(srcfile, 'rb'); src = f.read(); f.close()
match = re.search("\nmd5_data = {\n([^}]+)}", src)
if not match:
print >>sys.stderr, "Internal error!"
sys.exit(2)
src = src[:match.start(1)] + repl + src[match.end(1):]
f = open(srcfile,'w')
f.write(src)
f.close()
if __name__=='__main__':
if len(sys.argv)>2 and sys.argv[1]=='--md5update':
update_md5(sys.argv[2:])
else:
main(sys.argv[1:])
##############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__version__ = '1.0dev'
import os
from ez_setup import use_setuptools
use_setuptools()
from setuptools import setup, find_packages, Extension
here = os.path.abspath(os.path.dirname(__file__))
README = open(os.path.join(here, 'README.txt')).read()
setup(name='zope.transaction',
version=__version__,
description='Transaction management for Python',
long_description=README,
classifiers=[
"Development Status :: 6 - Mature",
"License :: OSI Approved :: Zope Public License"
"Programming Language :: Python"
"Topic :: Database"
"Topic :: Software Development :: Libraries :: Python Modules"
"Operating System :: Microsoft :: Windows"
"Operating System :: Unix"
],
author="Zope Corporation",
author_email="zodb-dev@zope.org",
url="http://www.zope.org/Products/ZODB",
license="ZPL 2.1",
platforms=["any"],
packages=find_packages(),
ext_modules = [
Extension('zope.transaction.TimeStamp',
['zope/transaction/TimeStamp.c']),
],
include_package_data=True,
zip_safe=False,
test_suite="zope.transaction.tests",
tests_require = ['zope.interface',
'zope.testing',
'ZODB3'],
install_requires=[
'zope.interface',
],
entry_points = """\
"""
)
# namespace package
__import__('pkg_resources').declare_namespace(__name__)
This diff is collapsed.
......@@ -16,8 +16,9 @@
$Id$
"""
from transaction._transaction import Transaction
from transaction._manager import TransactionManager, ThreadTransactionManager
from zope.transaction._transaction import Transaction
from zope.transaction._manager import TransactionManager
from zope.transaction._manager import ThreadTransactionManager
manager = ThreadTransactionManager()
get = manager.get
......
......@@ -19,9 +19,9 @@ are associated with the right transaction.
import thread
from ZODB.utils import WeakSet, deprecated37
from zope.transaction.weakset import WeakSet
from transaction._transaction import Transaction
from zope.transaction._transaction import Transaction
# Used for deprecated arguments. ZODB.utils.DEPRECATED_ARGUMENT was
# too hard to use here, due to the convoluted import dance across
......
......@@ -106,13 +106,10 @@ import traceback
from cStringIO import StringIO
from zope import interface
from ZODB.utils import WeakSet
from ZODB.utils import deprecated37, deprecated38
from ZODB.POSException import TransactionFailedError
from ZODB.utils import oid_repr
from transaction import interfaces
from zope.transaction.weakset import WeakSet
from zope.transaction.interfaces import TransactionFailedError
from zope.transaction import interfaces
_marker = object()
......@@ -360,11 +357,6 @@ class Transaction(object):
kws = {}
self._before_commit.append((hook, tuple(args), kws))
def beforeCommitHook(self, hook, *args, **kws):
deprecated38("Use addBeforeCommitHook instead of beforeCommitHook.")
self.addBeforeCommitHook(hook, args, kws)
def _callBeforeCommitHooks(self):
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addBeforeCommitHook() may
......@@ -574,6 +566,19 @@ def object_hint(o):
oid = oid_repr(oid)
return "%s oid=%s" % (klass, oid)
def oid_repr(oid):
if isinstance(oid, str) and len(oid) == 8:
# Convert to hex and strip leading zeroes.
as_hex = hexlify(oid).lstrip('0')
# Ensure two characters per input byte.
if len(as_hex) & 1:
as_hex = '0' + as_hex
elif as_hex == '':
as_hex = '00'
return '0x' + as_hex
else:
return repr(oid)
# TODO: deprecate for 3.6.
class DataManagerAdapter(object):
"""Adapt zodb 4-style data managers to zodb3 style
......
......@@ -472,5 +472,17 @@ class ISynchronizer(zope.interface.Interface):
begin() method is called explictly.
"""
class DoomedTransaction(Exception):
class TransactionError(Exception):
"""An error occurred due to normal transaction processing."""
class TransactionFailedError(TransactionError):
"""Cannot perform an operation on a transaction that previously failed.
An attempt was made to commit a transaction, or to join a transaction,
but this transaction previously raised an exception during an attempt
to commit it. The transaction must be explicitly aborted, either by
invoking abort() on the transaction, or begin() on its transaction
manager.
"""
class DoomedTransaction(TransactionError):
"""A commit was attempted on a transaction that was doomed."""
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test cases for objects implementing IDataManager.
This is a combo test between Connection and DB, since the two are
rather incestuous and the DB Interface is not defined that I was
able to find.
To do a full test suite one would probably want to write a dummy
storage that will raise errors as needed for testing.
I started this test suite to reproduce a very simple error (tpc_abort
had an error and wouldn't even run if called). So it is *very*
incomplete, and even the tests that exist do not make sure that
the data actually gets written/not written to the storge.
Obviously this test suite should be expanded.
$Id$
"""
from unittest import TestCase
class IDataManagerTests(TestCase, object):
def setUp(self):
self.datamgr = None # subclass should override
self.obj = None # subclass should define Persistent object
self.txn_factory = None
def get_transaction(self):
return self.txn_factory()
################################
# IDataManager interface tests #
################################
def testCommitObj(self):
tran = self.get_transaction()
self.datamgr.prepare(tran)
self.datamgr.commit(tran)
def testAbortTran(self):
tran = self.get_transaction()
self.datamgr.prepare(tran)
self.datamgr.abort(tran)
......@@ -25,7 +25,7 @@ use savepoints and doom() safely.
To see how it works we first need to create a stub data manager:
>>> from transaction.interfaces import IDataManager
>>> from zope.transaction.interfaces import IDataManager
>>> from zope.interface import implements
>>> class DataManager:
... implements(IDataManager)
......@@ -45,7 +45,7 @@ To see how it works we first need to create a stub data manager:
Start a new transaction:
>>> import transaction
>>> from zope import transaction
>>> txn = transaction.begin()
>>> dm = DataManager()
>>> txn.join(dm)
......
This diff is collapsed.
......@@ -24,8 +24,9 @@ demonstrating the correct operation of savepoint support within the
transaction system. This data manager is very simple. It provides flat
storage of named immutable values, like strings and numbers.
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> from zope import transaction
>>> from zope.transaction.tests import savepointsample
>>> dm = savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob'
As with other data managers, we can commit changes:
......@@ -202,7 +203,7 @@ Databases without savepoint support
Normally it's an error to use savepoints with databases that don't support
savepoints:
>>> dm_no_sp = transaction.tests.savepointsample.SampleDataManager()
>>> dm_no_sp = savepointsample.SampleDataManager()
>>> dm_no_sp['name'] = 'bob'
>>> transaction.commit()
>>> dm_no_sp['name'] = 'sally'
......
......@@ -23,7 +23,7 @@ $Id$
import UserDict
from zope import interface
import transaction.interfaces
import zope.transaction.interfaces
class SampleDataManager(UserDict.DictMixin):
"""Sample implementation of data manager that doesn't support savepoints
......@@ -31,11 +31,12 @@ class SampleDataManager(UserDict.DictMixin):
This data manager stores named simple values, like strings and numbers.
"""
interface.implements(transaction.interfaces.IDataManager)
interface.implements(zope.transaction.interfaces.IDataManager)
def __init__(self, transaction_manager=None):
if transaction_manager is None:
# Use the thread-local transaction manager if none is provided:
from zope import transaction
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
......@@ -157,7 +158,7 @@ class SampleSavepointDataManager(SampleDataManager):
This extends the basic data manager with savepoint support.
"""
interface.implements(transaction.interfaces.ISavepointDataManager)
interface.implements(zope.transaction.interfaces.ISavepointDataManager)
def savepoint(self):
# When we create the savepoint, we save the existing database state.
......@@ -174,7 +175,7 @@ class SampleSavepointDataManager(SampleDataManager):
class SampleSavepoint:
interface.implements(transaction.interfaces.IDataManagerSavepoint)
interface.implements(zope.transaction.interfaces.IDataManagerSavepoint)
def __init__(self, data_manager, data):
self.data_manager = data_manager
......
......@@ -408,5 +408,8 @@ def test_suite():
from zope.testing.doctest import DocTestSuite
return DocTestSuite()
# additional_tests is for setuptools "setup.py test" support
additional_tests = test_suite
if __name__ == '__main__':
unittest.main()
......@@ -431,5 +431,8 @@ def test_suite():
from doctest import DocTestSuite
return DocTestSuite()
# additional_tests is for setuptools "setup.py test" support
additional_tests = test_suite
if __name__ == '__main__':
unittest.main()
......@@ -91,7 +91,7 @@ Clean up:
>>> transaction.abort()
"""
import transaction
from zope import transaction
class Object(object):
......@@ -152,3 +152,6 @@ from zope.testing import doctest
def test_suite():
return doctest.DocTestSuite()
# additional_tests is for setuptools "setup.py test" support
additional_tests = test_suite
......@@ -26,14 +26,15 @@ A savepoint needs to not just rollback it's savepoints, but needs to
rollback savepoints for data managers that joined savepoints after the
savepoint:
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> from zope import transaction
>>> from zope.transaction.tests import savepointsample
>>> dm = savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob'
>>> sp1 = transaction.savepoint()
>>> dm['job'] = 'geek'
>>> sp2 = transaction.savepoint()
>>> dm['salary'] = 'fun'
>>> dm2 = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm2 = savepointsample.SampleSavepointDataManager()
>>> dm2['name'] = 'sally'
>>> 'name' in dm
......@@ -60,10 +61,13 @@ savepoint:
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('../savepoint.txt'),
doctest.DocFileSuite('savepoint.txt'),
doctest.DocTestSuite(),
))
# additional_tests is for setuptools "setup.py test" support
additional_tests = test_suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
#############################################################################
#
# Copyright (c) 2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest
import time
from zope.transaction.TimeStamp import TimeStamp
EPSILON = 0.000001
class TimeStampTests(unittest.TestCase):
def testStringInput(self):
ts = TimeStamp('00000000')
self.assertEqual(repr(ts), '00000000')
ts = TimeStamp('11111111')
self.assertEqual(repr(ts), '11111111')
def testTupleInput(self):
t = int(time.time())
args = time.gmtime(t)[:6]
ts = TimeStamp(*args)
self.assertEqual(ts.year(), args[0])
self.assertEqual(ts.month(), args[1])
self.assertEqual(ts.day(), args[2])
self.assertEqual(ts.hour(), args[3])
self.assertEqual(ts.minute(), args[4])
self.assertEqual(int(round(ts.second())), args[5])
def testRaw(self):
ts = TimeStamp('00000000')
self.assertEqual(repr(ts), ts.raw())
ts = TimeStamp('11111111')
self.assertEqual(repr(ts), ts.raw())
def testStr(self):
t1 = 1141445984
args1 = time.gmtime(t1)[:6]
ts1 = TimeStamp(*args1)
self.assertEqual(str(ts1), '2006-03-04 04:19:44.000000')
def testTimeTime(self):
t = int(time.time())
args = time.gmtime(t)[:6]
ts = TimeStamp(*args)
tt = ts.timeTime()
self.assertEqual(tt, t)
def testYMDTimeStamp(self):
self._check_ymd(2001, 6, 3)
def _check_ymd(self, yr, mo, dy):
ts = TimeStamp(yr, mo, dy)
self.assertEqual(ts.year(), yr)
self.assertEqual(ts.month(), mo)
self.assertEqual(ts.day(), dy)
self.assertEquals(ts.hour(), 0)
self.assertEquals(ts.minute(), 0)
self.assertEquals(ts.second(), 0)
t = time.gmtime(ts.timeTime())
self.assertEquals(yr, t[0])
self.assertEquals(mo, t[1])
self.assertEquals(dy, t[2])
def testFullTimeStamp(self):
native_ts = int(time.time()) # fractional seconds get in the way
t = time.gmtime(native_ts) # the corresponding GMT struct tm
ts = TimeStamp(*t[:6])
# Seconds are stored internally via (conceptually) multiplying by
# 2**32 then dividing by 60, ending up with a 32-bit integer.
# While this gives a lot of room for cramming many distinct
# TimeStamps into a second, it's not good at roundtrip accuracy.
# For example, 1 second is stored as int(2**32/60) == 71582788.
# Converting back gives 71582788*60.0/2**32 == 0.9999999962747097.
# In general, we can lose up to 0.999... to truncation during
# storing, creating an absolute error up to about 1*60.0/2**32 ==
# 0.000000014 on the seconds value we get back. This is so even
# when we have an exact integral second value going in (as we
# do in this test), so we can't expect equality in any comparison
# involving seconds. Minutes (etc) are stored exactly, so we
# can expect equality for those.
self.assert_(abs(ts.timeTime() - native_ts) < EPSILON)
self.assertEqual(ts.year(), t[0])
self.assertEqual(ts.month(), t[1])
self.assertEqual(ts.day(), t[2])
self.assertEquals(ts.hour(), t[3])
self.assertEquals(ts.minute(), t[4])
self.assert_(abs(ts.second() - t[5]) < EPSILON)
def testRawTimestamp(self):
t = time.gmtime()
ts1 = TimeStamp(*t[:6])
ts2 = TimeStamp(`ts1`)
self.assertEquals(ts1, ts2)
self.assertEquals(ts1.timeTime(), ts2.timeTime())
self.assertEqual(ts1.year(), ts2.year())
self.assertEqual(ts1.month(), ts2.month())
self.assertEqual(ts1.day(), ts2.day())
self.assertEquals(ts1.hour(), ts2.hour())
self.assertEquals(ts1.minute(), ts2.minute())
self.assert_(abs(ts1.second() - ts2.second()) < EPSILON)
def testDictKey(self):
t = time.gmtime()
ts1 = TimeStamp(*t[:6])
ts2 = TimeStamp(2000, *t[1:6])
d = {}
d[ts1] = 1
d[ts2] = 2
self.assertEquals(len(d), 2)
def testCompare(self):
ts1 = TimeStamp(1972, 6, 27)
ts2 = TimeStamp(1971, 12, 12)
self.assert_(ts1 > ts2)
self.assert_(ts2 <= ts1)
def testLaterThan(self):
t = time.gmtime()
ts = TimeStamp(*t[:6])
ts2 = ts.laterThan(ts)
self.assert_(ts2 > ts)
# TODO: should test for bogus inputs to TimeStamp constructor
def testTimeStamp(self):
# Alternate test suite
t = TimeStamp(2002, 1, 23, 10, 48, 5) # GMT
self.assertEquals(str(t), '2002-01-23 10:48:05.000000')
self.assertEquals(repr(t), '\x03B9H\x15UUU')
self.assertEquals(TimeStamp('\x03B9H\x15UUU'), t)
self.assertEquals(t.year(), 2002)
self.assertEquals(t.month(), 1)
self.assertEquals(t.day(), 23)
self.assertEquals(t.hour(), 10)
self.assertEquals(t.minute(), 48)
self.assertEquals(round(t.second()), 5)
self.assertEquals(t.timeTime(), 1011782885)
t1 = TimeStamp(2002, 1, 23, 10, 48, 10)
self.assertEquals(str(t1), '2002-01-23 10:48:10.000000')
self.assert_(t == t)
self.assert_(t != t1)
self.assert_(t < t1)
self.assert_(t <= t1)
self.assert_(t1 >= t)
self.assert_(t1 > t)
self.failIf(t == t1)
self.failIf(t != t)
self.failIf(t > t1)
self.failIf(t >= t1)
self.failIf(t1 < t)
self.failIf(t1 <= t)
self.assertEquals(cmp(t, t), 0)
self.assertEquals(cmp(t, t1), -1)
self.assertEquals(cmp(t1, t), 1)
self.assertEquals(t1.laterThan(t), t1)
self.assert_(t.laterThan(t1) > t1)
self.assertEquals(TimeStamp(2002,1,23), TimeStamp(2002,1,23,0,0,0))
def test_suite():
return unittest.makeSuite(TimeStampTests)
if __name__ == '__main__':
unittest.main()
......@@ -39,12 +39,22 @@ TODO
$Id$
"""
import struct
import unittest
import warnings
import transaction
from ZODB.utils import positive_id
from ZODB.tests.warnhook import WarningsHook
from zope import transaction
from zope.transaction.tests.warnhook import WarningsHook
_ADDRESS_MASK = 256 ** struct.calcsize('P')
def positive_id(obj):
"""Return id(obj) as a non-negative integer."""
result = id(obj)
if result < 0:
result += _ADDRESS_MASK
assert result > 0
return result
class TransactionTests(unittest.TestCase):
......@@ -343,8 +353,8 @@ def test_join():
The argument to join must be a zodb4 data manager,
transaction.interfaces.IDataManager.
>>> from ZODB.tests.sampledm import DataManager
>>> from transaction._transaction import DataManagerAdapter
>>> from zope.transaction.tests.sampledm import DataManager
>>> from zope.transaction._transaction import DataManagerAdapter
>>> t = transaction.Transaction()
>>> dm = DataManager()
>>> t.join(dm)
......@@ -362,178 +372,6 @@ def test_join():
def hook():
pass
# deprecated38; remove this then
def test_beforeCommitHook():
"""Test beforeCommitHook.
Let's define a hook to call, and a way to see that it was called.
>>> log = []
>>> def reset_log():
... del log[:]
>>> def hook(arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
... log.append("arg %r kw1 %r kw2 %r" % (arg, kw1, kw2))
beforeCommitHook is deprecated, so we need cruft to suppress the
warnings.
>>> whook = WarningsHook()
>>> whook.install()
Fool the warnings module into delivering the warnings despite that
they've been seen before; this is needed in case this test is run
more than once.
>>> import warnings
>>> warnings.filterwarnings("always", category=DeprecationWarning)
Now register the hook with a transaction.
>>> import transaction
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, '1')
Make sure it triggered a deprecation warning:
>>> len(whook.warnings)
1
>>> message, category, filename, lineno = whook.warnings[0]
>>> print message
This will be removed in ZODB 3.8:
Use addBeforeCommitHook instead of beforeCommitHook.
>>> category.__name__
'DeprecationWarning'
>>> whook.clear()
We can see that the hook is indeed registered.
>>> [(hook.func_name, args, kws)
... for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('1',), {})]
When transaction commit starts, the hook is called, with its
arguments.
>>> log
[]
>>> t.commit()
>>> log
["arg '1' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
A hook's registration is consumed whenever the hook is called. Since
the hook above was called, it's no longer registered:
>>> len(list(t.getBeforeCommitHooks()))
0
>>> transaction.commit()
>>> log
[]
The hook is only called for a full commit, not for a savepoint.
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, 'A', kw1='B')
>>> dummy = t.savepoint()
>>> log
[]
>>> t.commit()
>>> log
["arg 'A' kw1 'B' kw2 'no_kw2'"]
>>> reset_log()
If a transaction is aborted, no hook is called.
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, "OOPS!")
>>> transaction.abort()
>>> log
[]
>>> transaction.commit()
>>> log
[]
The hook is called before the commit does anything, so even if the
commit fails the hook will have been called. To provoke failures in
commit, we'll add failing resource manager to the transaction.
>>> class CommitFailure(Exception):
... pass
>>> class FailingDataManager:
... def tpc_begin(self, txn, sub=False):
... raise CommitFailure
... def abort(self, txn):
... pass
>>> t = transaction.begin()
>>> t.join(FailingDataManager())
>>> t.beforeCommitHook(hook, '2')
>>> t.commit()
Traceback (most recent call last):
...
CommitFailure
>>> log
["arg '2' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
Let's register several hooks.
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, '4', kw1='4.1')
>>> t.beforeCommitHook(hook, '5', kw2='5.2')
They are returned in the same order by getBeforeCommitHooks.
>>> [(hook.func_name, args, kws) #doctest: +NORMALIZE_WHITESPACE
... for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('4',), {'kw1': '4.1'}),
('hook', ('5',), {'kw2': '5.2'})]
And commit also calls them in this order.
>>> t.commit()
>>> len(log)
2
>>> log #doctest: +NORMALIZE_WHITESPACE
["arg '4' kw1 '4.1' kw2 'no_kw2'",
"arg '5' kw1 'no_kw1' kw2 '5.2'"]
>>> reset_log()
While executing, a hook can itself add more hooks, and they will all
be called before the real commit starts.
>>> def recurse(txn, arg):
... log.append('rec' + str(arg))
... if arg:
... txn.beforeCommitHook(hook, '-')
... txn.beforeCommitHook(recurse, txn, arg-1)
>>> t = transaction.begin()
>>> t.beforeCommitHook(recurse, t, 3)
>>> transaction.commit()
>>> log #doctest: +NORMALIZE_WHITESPACE
['rec3',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec2',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec1',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec0']
>>> reset_log()
We have to uninstall the warnings hook so that other warnings don't get
lost.
>>> whook.uninstall()
Obscure: There is no API call for removing the filter we added, but
filters appears to be a public variable.
>>> del warnings.filters[0]
"""
def test_addBeforeCommitHook():
"""Test addBeforeCommitHook.
......@@ -548,7 +386,7 @@ def test_addBeforeCommitHook():
Now register the hook with a transaction.
>>> import transaction
>>> from zope import transaction
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, '1')
......@@ -722,7 +560,7 @@ def test_addAfterCommitHook():
Now register the hook with a transaction.
>>> import transaction
>>> from zope import transaction
>>> t = transaction.begin()
>>> t.addAfterCommitHook(hook, '1')
......@@ -936,5 +774,8 @@ def test_suite():
unittest.makeSuite(TransactionTests),
))
# additional_tests is for setuptools "setup.py test" support
additional_tests = test_suite
if __name__ == '__main__':
unittest.TextTestRunner().run(test_suite())
##############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import unittest
from zope.transaction.weakset import WeakSet
class Dummy:
pass
class WeakSetTests(unittest.TestCase):
def test_contains(self):
w = WeakSet()
dummy = Dummy()
w.add(dummy)
self.assertEqual(dummy in w, True)
dummy2 = Dummy()
self.assertEqual(dummy2 in w, False)
def test_len(self):
w = WeakSet()
d1 = Dummy()
d2 = Dummy()
w.add(d1)
w.add(d2)
self.assertEqual(len(w), 2)
del d1
self.assertEqual(len(w), 1)
def test_remove(self):
w = WeakSet()
dummy = Dummy()
w.add(dummy)
self.assertEqual(dummy in w, True)
w.remove(dummy)
self.assertEqual(dummy in w, False)
def test_as_weakref_list(self):
w = WeakSet()
dummy = Dummy()
dummy2 = Dummy()
dummy3 = Dummy()
w.add(dummy)
w.add(dummy2)
w.add(dummy3)
del dummy3
L = [x() for x in w.as_weakref_list()]
self.assertEqual(L, [dummy, dummy2])
def test_map(self):
w = WeakSet()
dummy = Dummy()
dummy2 = Dummy()
dummy3 = Dummy()
w.add(dummy)
w.add(dummy2)
w.add(dummy3)
def poker(x):
x.poked = 1
w.map(poker)
for thing in dummy, dummy2, dummy3:
self.assertEqual(thing.poked, 1)
def test_suite():
return unittest.makeSuite(WeakSetTests)
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import warnings
class WarningsHook:
"""Hook to capture warnings generated by Python.
The function warnings.showwarning() is designed to be hooked by
application code, allowing the application to customize the way it
handles warnings.
This hook captures the unformatted warning information and stores
it in a list. A test can inspect this list after the test is over.
Issues:
The warnings module has lots of delicate internal state. If
a warning has been reported once, it won't be reported again. It
may be necessary to extend this class with a mechanism for
modifying the internal state so that we can be guaranteed a
warning will be reported.
If Python is run with a warnings filter, e.g. python -Werror,
then a test that is trying to inspect a particular warning will
fail. Perhaps this class can be extended to install more-specific
filters the test to work anyway.
"""
def __init__(self):
self.original = None
self.warnings = []
def install(self):
self.original = warnings.showwarning
warnings.showwarning = self.showwarning
def uninstall(self):
assert self.original is not None
warnings.showwarning = self.original
self.original = None
def showwarning(self, message, category, filename, lineno):
self.warnings.append((str(message), category, filename, lineno))
def clear(self):
self.warnings = []
############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
import weakref
# A simple implementation of weak sets, supplying just enough of Python's
# sets.Set interface for our needs.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
def __len__(self):
return len(self.data)
def __contains__(self, obj):
return id(obj) in self.data
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# f is a one-argument function. Execute f(elt) for each elt in the
# set. f's return value is ignored.
def map(self, f):
for wr in self.as_weakref_list():
elt = wr()
if elt is not None:
f(elt)
# Return a list of weakrefs to all the objects in the collection.
# Because a weak dict is used internally, iteration is dicey (the
# underlying dict may change size during iteration, due to gc or
# activity from other threads). as_weakef_list() is safe.
#
# Something like this should really be a method of Python's weak dicts.
# If we invoke self.data.values() instead, we get back a list of live
# objects instead of weakrefs. If gc occurs while this list is alive,
# all the objects move to an older generation (because they're strongly
# referenced by the list!). They can't get collected then, until a
# less frequent collection of the older generation. Before then, if we
# invoke self.data.values() again, they're still alive, and if gc occurs
# while that list is alive they're all moved to yet an older generation.
# And so on. Stress tests showed that it was easy to get into a state
# where a WeakSet grows without bounds, despite that almost all its
# elements are actually trash. By returning a list of weakrefs instead,
# we avoid that, although the decision to use weakrefs is now# very
# visible to our clients.
def as_weakref_list(self):
# We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment