Commit 3b599ac4 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge changes from the zeo-1_0-branch onto the debug branch

parent 7f238e85
......@@ -73,7 +73,7 @@ file 0 and file 1.
"""
__version__ = "$Revision: 1.21 $"[11:-2]
__version__ = "$Revision: 1.22 $"[11:-2]
import os, tempfile
from struct import pack, unpack
......
......@@ -13,8 +13,7 @@
##############################################################################
"""Network ZODB storage client
"""
__version__='$Revision: 1.38 $'[11:-2]
__version__='$Revision: 1.39 $'[11:-2]
import struct, time, os, socket, string, Sync, zrpc, ClientCache
import tempfile, Invalidator, ExtensionClass, thread
......
#############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__version__ = "$Revision: 1.34 $"[11:-2]
__version__ = "$Revision: 1.35 $"[11:-2]
import asyncore, socket, string, sys, os
from smac import SizedMessageAsyncConnection
......@@ -99,6 +28,7 @@ from thread import start_new_thread
from cStringIO import StringIO
from ZEO import trigger
from ZEO import asyncwrap
from ZEO.smac import Disconnected
from types import StringType
class StorageServerError(POSException.StorageError): pass
......@@ -133,6 +63,8 @@ class StorageServer(asyncore.dispatcher):
self.__storages=storages
for n, s in storages.items():
init_storage(s)
# Create a waiting list to support the distributed commit lock.
s._waiting = []
self.__connections={}
self.__get_connections=self.__connections.get
......@@ -280,6 +212,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
# This is the first communication from the client
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
# Send info back asynchronously, so client need not ask
self.message_output('S'+dump(self.get_info(), 1))
return
......@@ -501,39 +434,76 @@ class ZEOConnection(SizedMessageAsyncConnection):
return oids
return ()
def tpc_abort(self, id):
t=self._transaction
if t is None or id != t.id: return
r=self.__storage.tpc_abort(t)
# distributed commit lock support methods
storage=self.__storage
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
# Only one client at a time can commit a transaction on a
# storage. If one client is committing a transaction, and a
# second client sends a tpc_begin(), then second client is queued.
# When the first transaction finishes, either by abort or commit,
# the request from the queued client must be handled.
# It is important that this code be robust. If a queued
# transaction is not restarted, the server will stop processing
# new transactions.
# This lock is implemented by storing the queued requests in a
# list on the storage object. The list contains:
# a callable object to resume request
# arguments to that object
# a callable object to handle errors during resume
# XXX I am not sure that the commitlock_resume() method is
# sufficiently paranoid.
def commitlock_suspend(self, resume, args, onerror):
self.__storage._waiting.append((resume, args, onerror))
def commitlock_resume(self):
waiting = self.__storage._waiting
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
resume, args, onerror = waiting.pop(0)
try:
if apply(resume, args):
break
except Disconnected:
# A disconnected error isn't an unexpected error.
# There should be no need to log it, because the
# disconnect will have generated its own log event.
onerror()
except:
LOG('ZEO Server', ERROR,
"Unexpected error handling queued tpc_begin()",
error=sys.exc_info())
onerror()
self._transaction=None
self.__invalidated=[]
def tpc_abort(self, id):
t = self._transaction
if t is None or id != t.id:
return
r = self.__storage.tpc_abort(t)
self._transaction = None
self.__invalidated = []
self.commitlock_resume()
def unlock(self):
if self.__closed: return
if self.__closed:
return
self.message_output('UN.')
def tpc_begin(self, id, user, description, ext):
t=self._transaction
t = self._transaction
if t is not None:
if id == t.id: return
if id == t.id:
return
else:
raise StorageServerError(
"Multiple simultaneous tpc_begin requests from the same "
"client."
)
storage=self.__storage
storage = self.__storage
if storage._transaction is not None:
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append((self.unlock, ()))
self.commitlock_suspend(self.unlock, (), self.close)
return 1 # Return a flag indicating a lock condition.
self._transaction=t=Transaction()
......@@ -552,9 +522,9 @@ class ZEOConnection(SizedMessageAsyncConnection):
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
else:
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append((self.try_again_sync, (id, user, description, ext)))
self.commitlock_suspend(self.try_again_sync,
(id, user, description, ext),
self.close)
return _noreturn
......@@ -572,24 +542,21 @@ class ZEOConnection(SizedMessageAsyncConnection):
return 1
def tpc_finish(self, id, user, description, ext):
t=self._transaction
if id != t.id: return
t = self._transaction
if id != t.id:
return
storage=self.__storage
r=storage.tpc_finish(t)
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
storage = self.__storage
r = storage.tpc_finish(t)
self._transaction=None
self._transaction = None
if self.__invalidated:
self.__server.invalidate(self, self.__storage_id,
self.__invalidated,
self.get_size_info())
self.__invalidated=[]
self.__invalidated = []
self.commitlock_resume()
def init_storage(storage):
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None
......
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A wrapper for asyncore that provides robust exception handling.
The poll() and loop() calls exported by asyncore can raise exceptions.
......@@ -32,6 +31,10 @@ it would be useful to extend this module with wrappers for those
errors.
"""
# XXX The current implementation requires Python 2.0. Not sure if
# that's acceptable, depends on how many users want to combine ZEO 1.0
# and Zope 2.3.
import asyncore
import errno
import select
......
......@@ -14,7 +14,7 @@
"""Sized message async connections
"""
__version__ = "$Revision: 1.14 $"[11:-2]
__version__ = "$Revision: 1.15 $"[11:-2]
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
......
......@@ -15,7 +15,7 @@
"""Start the server storage.
"""
__version__ = "$Revision: 1.29 $"[11:-2]
__version__ = "$Revision: 1.30 $"[11:-2]
import sys, os, getopt, string
......@@ -264,10 +264,7 @@ def rotate_logs():
zLOG.log_write.reinitialize()
else:
# Hm, lets at least try to take care of the stupid logger:
if hasattr(zLOG, '_set_stupid_dest'):
zLOG._set_stupid_dest(None)
else:
zLOG._stupid_dest = None
zLOG._stupid_dest=None
def rotate_logs_handler(signum, frame):
rotate_logs()
......
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the ZEO cache"""
from ZODB.Transaction import Transaction
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Library for forking storage server and connecting client storage"""
import asyncore
......@@ -62,7 +61,7 @@ if os.name == "nt":
args = (sys.executable, script, str(port), storage_name) + args
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d)
pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
return ('localhost', port), ('localhost', port + 1), pid
else:
......@@ -98,16 +97,14 @@ else:
rd, wr = os.pipe()
pid = os.fork()
if pid == 0:
try:
if PROFILE:
p = profile.Profile()
p.runctx("run_server(storage, addr, rd, wr)", globals(),
locals())
p.dump_stats("stats.s.%d" % os.getpid())
else:
run_server(storage, addr, rd, wr)
finally:
os._exit(0)
if PROFILE:
p = profile.Profile()
p.runctx("run_server(storage, addr, rd, wr)", globals(),
locals())
p.dump_stats("stats.s.%d" % os.getpid())
else:
run_server(storage, addr, rd, wr)
os._exit(0)
else:
os.close(rd)
return pid, ZEOClientExit(wr)
......
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A multi-client test of the ZEO storage server"""
import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
......@@ -70,18 +69,16 @@ def start_server(addr):
def start_client(addr, client_func=None):
pid = os.fork()
if pid == 0:
try:
import ZEO.ClientStorage
if VERBOSE:
print "Client process started:", os.getpid()
cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
if client_func is None:
run(cli)
else:
client_func(cli)
cli.close()
finally:
os._exit(0)
import ZEO.ClientStorage
if VERBOSE:
print "Client process started:", os.getpid()
cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
if client_func is None:
run(cli)
else:
client_func(cli)
cli.close()
os._exit(0)
else:
return pid
......
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A ZEO client-server stress test to look for leaks.
The stress test should run in an infinite loop and should involve
......@@ -105,34 +104,32 @@ def start_child(zaddr):
if pid != 0:
return pid
try:
storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
setup(db.open())
conns = []
conn_count = 0
for i in range(NUM_CONNECTIONS):
storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
setup(db.open())
conns = []
conn_count = 0
for i in range(NUM_CONNECTIONS):
c = db.open()
c.__count = 0
conns.append(c)
conn_count += 1
while conn_count < 25:
c = random.choice(conns)
if c.__count > NUM_TRANSACTIONS_PER_CONN:
conns.remove(c)
c.close()
conn_count += 1
c = db.open()
c.__count = 0
conns.append(c)
conn_count += 1
else:
c.__count += 1
work(c)
while conn_count < 25:
c = random.choice(conns)
if c.__count > NUM_TRANSACTIONS_PER_CONN:
conns.remove(c)
c.close()
conn_count += 1
c = db.open()
c.__count = 0
conns.append(c)
else:
c.__count += 1
work(c)
finally:
os._exit(0)
os._exit(0)
if __name__ == "__main__":
main()
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for ZEO based on ZODB.tests"""
import asyncore
......@@ -27,6 +26,8 @@ import unittest
import ZEO.ClientStorage, ZEO.StorageServer
import ThreadedAsync, ZEO.trigger
from ZODB.FileStorage import FileStorage
from ZODB.TimeStamp import TimeStamp
from ZODB.Transaction import Transaction
import thread
from ZEO.tests import forker, Cache
......@@ -35,7 +36,7 @@ from ZEO.smac import Disconnected
# Sorry Jim...
from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
TransactionalUndoStorage, TransactionalUndoVersionStorage, \
PackableStorage, Synchronization, ConflictResolution, RevisionStorage
PackableStorage, Synchronization, ConflictResolution
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
......@@ -56,9 +57,63 @@ class PackWaitWrapper:
self.storage.pack(t, f, wait=1)
class ZEOTestBase(StorageTestBase.StorageTestBase):
"""Version of the storage test class that supports ZEO."""
pass
"""Version of the storage test class that supports ZEO.
For ZEO, we don't always get the serialno/exception for a
particular store as the return value from the store. But we
will get no later than the return value from vote.
"""
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0):
"""Do a complete storage transaction.
The defaults are:
- oid=None, ask the storage for a new oid
- revid=None, use a revid of ZERO
- data=None, pickle up some arbitrary data (the integer 7)
- version=None, use the empty string version
Returns the object's new revision id.
"""
if oid is None:
oid = self._storage.new_oid()
if revid is None:
revid = ZERO
if data is None:
data = MinPO(7)
if not already_pickled:
data = StorageTestBase.zodb_pickle(data)
if version is None:
version = ''
# Begin the transaction
t = Transaction()
self._storage.tpc_begin(t)
# Store an object
r1 = self._storage.store(oid, revid, data, version, t)
s1 = self._get_serial(r1)
# Finish the transaction
r2 = self._storage.tpc_vote(t)
s2 = self._get_serial(r2)
self._storage.tpc_finish(t)
# s1, s2 can be None or dict
assert not (s1 and s2)
return s1 and s1[oid] or s2 and s2[oid]
def _get_serial(self, r):
"""Return oid -> serialno dict from sequence of ZEO replies."""
d = {}
if r is None:
return None
if type(r) == types.StringType:
raise RuntimeError, "unexpected ZEO response: no oid"
else:
for oid, serial in r:
if isinstance(serial, Exception):
raise serial
d[oid] = serial
return d
# Some of the ZEO tests depend on the version of FileStorage available
# for the tests. If we run these tests using Zope 2.3, FileStorage
# doesn't support TransactionalUndo.
......@@ -75,14 +130,13 @@ if hasattr(FileStorage, 'supportsTransactionalUndo'):
else:
class VersionDependentTests:
pass
class GenericTests(ZEOTestBase,
VersionDependentTests,
Cache.StorageWithCache,
Cache.TransUndoStorageWithCache,
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
RevisionStorage.RevisionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
):
......@@ -94,12 +148,16 @@ class GenericTests(ZEOTestBase,
returns a specific storage, e.g. FileStorage.
"""
__super_setUp = StorageTestBase.StorageTestBase.setUp
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
def setUp(self):
"""Start a ZEO server using a Unix domain socket
The ZEO server uses the storage object returned by the
getStorage() method.
"""
self.__super_setUp()
self.running = 1
client, exit, pid = forker.start_zeo(self.getStorage())
self._pid = pid
......@@ -114,13 +172,68 @@ class GenericTests(ZEOTestBase,
self._server.close()
os.waitpid(self._pid, 0)
self.delStorage()
self.__super_tearDown()
def checkTwoArgBegin(self):
# XXX ZEO doesn't support 2-arg begin
pass
def checkLargeUpdate(self):
obj = MinPO("X" * (10 * 128 * 1024))
self._dostore(data=obj)
def checkTwoArgBegin(self):
pass # ZEO 1 doesn't support two-arg begin
def checkCommitLockOnCommit(self):
self._checkCommitLock("tpc_finish")
def checkCommitLockOnAbort(self):
self._checkCommitLock("tpc_abort")
def _checkCommitLock(self, method_name):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
# Start on transaction normally.
t = Transaction()
self._storage.tpc_begin(t)
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
for i in range(3):
storage2 = self._duplicate_client()
t2 = Transaction()
tid = self._get_timestamp()
storage2._call.sendMessage('tpc_begin_sync',
tid, t2.user, t2.description,
t2._extension)
if i == 0:
storage2.close()
else:
self._storages.append((storage2, t2))
oid = self._storage.new_oid()
self._storage.store(oid, None, '', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
for store, trans in self._storages:
store.tpc_abort(trans)
store.close()
# Make sure the server is still responsive
self._dostore()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
addr = self._storage._connection
new = ZEO.ClientStorage.ClientStorage(addr)
new.registerDB(DummyDB(), None)
return new
def _get_timestamp(self):
t = time.time()
t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
return 't'
class ZEOFileStorageTests(GenericTests):
__super_setUp = GenericTests.setUp
......@@ -148,8 +261,11 @@ class WindowsGenericTests(GenericTests):
can't be created in the parent process and passed to the child.
All the work has to be done in the server's process.
"""
__super_setUp = StorageTestBase.StorageTestBase.setUp
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
def setUp(self):
self.__super_setUp()
args = self.getStorageInfo()
name = args[0]
args = args[1:]
......@@ -169,6 +285,7 @@ class WindowsGenericTests(GenericTests):
## os.waitpid(self.test_pid, 0)
time.sleep(0.5)
self.delStorage()
self.__super_tearDown()
class WindowsZEOFileStorageTests(WindowsGenericTests):
......@@ -192,6 +309,8 @@ class ConnectionTests(ZEOTestBase):
start and stop a ZEO storage server.
"""
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
ports = []
for i in range(200):
ports.append(random.randrange(25000, 30000))
......@@ -207,7 +326,6 @@ class ConnectionTests(ZEOTestBase):
def tearDown(self):
"""Try to cause the tests to halt"""
self._storage.close()
self.shutdownServer()
# file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
......@@ -218,6 +336,7 @@ class ConnectionTests(ZEOTestBase):
path = "c1-test-%d.zec" % i
if os.path.exists(path):
os.unlink(path)
self.__super_tearDown()
def checkBasicPersistence(self):
"""Verify cached data persists across client storage instances.
......
......@@ -11,7 +11,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Helper file used to launch ZEO server for Windows tests"""
import asyncore
......
......@@ -14,7 +14,7 @@
"""Simple rpc mechanisms
"""
__version__ = "$Revision: 1.21 $"[11:-2]
__version__ = "$Revision: 1.22 $"[11:-2]
from cPickle import loads
import cPickle
......
......@@ -14,7 +14,7 @@
"""Sized message async connections
"""
__version__ = "$Revision: 1.14 $"[11:-2]
__version__ = "$Revision: 1.15 $"[11:-2]
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment