From 3b599ac424b34c78fd8200d29b6b00dec5b28c11 Mon Sep 17 00:00:00 2001
From: Jeremy Hylton <jeremy@svn.zope.org>
Date: Fri, 15 Mar 2002 05:11:54 +0000
Subject: [PATCH] Merge changes from the zeo-1_0-branch onto the debug branch

---
 src/ZEO/ClientCache.py     |   2 +-
 src/ZEO/ClientStorage.py   |   3 +-
 src/ZEO/StorageServer.py   | 197 +++++++++++++++----------------------
 src/ZEO/asyncwrap.py       |   5 +-
 src/ZEO/smac.py            |   2 +-
 src/ZEO/start.py           |   7 +-
 src/ZEO/tests/Cache.py     |   1 -
 src/ZEO/tests/__init__.py  |  13 +++
 src/ZEO/tests/forker.py    |  21 ++--
 src/ZEO/tests/multi.py     |  23 ++---
 src/ZEO/tests/stress.py    |  47 +++++----
 src/ZEO/tests/testZEO.py   | 137 ++++++++++++++++++++++++--
 src/ZEO/tests/winserver.py |   1 -
 src/ZEO/zrpc.py            |   2 +-
 src/ZEO/zrpc/smac.py       |   2 +-
 15 files changed, 275 insertions(+), 188 deletions(-)

diff --git a/src/ZEO/ClientCache.py b/src/ZEO/ClientCache.py
index 77c4da2c..03065c5b 100644
--- a/src/ZEO/ClientCache.py
+++ b/src/ZEO/ClientCache.py
@@ -73,7 +73,7 @@ file 0 and file 1.
 
 """
 
-__version__ = "$Revision: 1.21 $"[11:-2]
+__version__ = "$Revision: 1.22 $"[11:-2]
 
 import os, tempfile
 from struct import pack, unpack
diff --git a/src/ZEO/ClientStorage.py b/src/ZEO/ClientStorage.py
index c5bd01c2..b55dee18 100644
--- a/src/ZEO/ClientStorage.py
+++ b/src/ZEO/ClientStorage.py
@@ -13,8 +13,7 @@
 ##############################################################################
 """Network ZODB storage client
 """
-
-__version__='$Revision: 1.38 $'[11:-2]
+__version__='$Revision: 1.39 $'[11:-2]
 
 import struct, time, os, socket, string, Sync, zrpc, ClientCache
 import tempfile, Invalidator, ExtensionClass, thread
diff --git a/src/ZEO/StorageServer.py b/src/ZEO/StorageServer.py
index c703acfd..933af535 100644
--- a/src/ZEO/StorageServer.py
+++ b/src/ZEO/StorageServer.py
@@ -1,89 +1,18 @@
-#############################################################################
-# 
-# Zope Public License (ZPL) Version 1.0
-# -------------------------------------
-# 
-# Copyright (c) Digital Creations.  All rights reserved.
-# 
-# This license has been certified as Open Source(tm).
-# 
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-# 
-# 1. Redistributions in source code must retain the above copyright
-#    notice, this list of conditions, and the following disclaimer.
-# 
-# 2. Redistributions in binary form must reproduce the above copyright
-#    notice, this list of conditions, and the following disclaimer in
-#    the documentation and/or other materials provided with the
-#    distribution.
-# 
-# 3. Digital Creations requests that attribution be given to Zope
-#    in any manner possible. Zope includes a "Powered by Zope"
-#    button that is installed by default. While it is not a license
-#    violation to remove this button, it is requested that the
-#    attribution remain. A significant investment has been put
-#    into Zope, and this effort will continue if the Zope community
-#    continues to grow. This is one way to assure that growth.
-# 
-# 4. All advertising materials and documentation mentioning
-#    features derived from or use of this software must display
-#    the following acknowledgement:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    In the event that the product being advertised includes an
-#    intact Zope distribution (with copyright and license included)
-#    then this clause is waived.
-# 
-# 5. Names associated with Zope or Digital Creations must not be used to
-#    endorse or promote products derived from this software without
-#    prior written permission from Digital Creations.
-# 
-# 6. Modified redistributions of any form whatsoever must retain
-#    the following acknowledgment:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    Intact (re-)distributions of any official Zope release do not
-#    require an external acknowledgement.
-# 
-# 7. Modifications are encouraged but must be packaged separately as
-#    patches to official Zope releases.  Distributions that do not
-#    clearly separate the patches from the original work must be clearly
-#    labeled as unofficial distributions.  Modifications which do not
-#    carry the name Zope may be packaged in any form, as long as they
-#    conform to all of the clauses above.
-# 
-# 
-# Disclaimer
-# 
-#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
-#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
-#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
-#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-#   SUCH DAMAGE.
-# 
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
 # 
-# This software consists of contributions made by Digital Creations and
-# many individuals on behalf of Digital Creations.  Specific
-# attributions are listed in the accompanying credits file.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
 
-__version__ = "$Revision: 1.34 $"[11:-2]
+__version__ = "$Revision: 1.35 $"[11:-2]
 
 import asyncore, socket, string, sys, os
 from smac import SizedMessageAsyncConnection
@@ -99,6 +28,7 @@ from thread import start_new_thread
 from cStringIO import StringIO
 from ZEO import trigger
 from ZEO import asyncwrap
+from ZEO.smac import Disconnected
 from types import StringType
 
 class StorageServerError(POSException.StorageError): pass
@@ -133,6 +63,8 @@ class StorageServer(asyncore.dispatcher):
         self.__storages=storages
         for n, s in storages.items():
             init_storage(s)
+            # Create a waiting list to support the distributed commit lock.
+            s._waiting = []
 
         self.__connections={}
         self.__get_connections=self.__connections.get
@@ -280,6 +212,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
             # This is the first communication from the client
             self.__storage, self.__storage_id = (
                 self.__server.register_connection(self, message))
+
             # Send info back asynchronously, so client need not ask
             self.message_output('S'+dump(self.get_info(), 1))
             return
@@ -501,39 +434,76 @@ class ZEOConnection(SizedMessageAsyncConnection):
             return oids
         return ()
 
-    def tpc_abort(self, id):
-        t=self._transaction
-        if t is None or id != t.id: return
-        r=self.__storage.tpc_abort(t)
+    # distributed commit lock support methods
 
-        storage=self.__storage
-        try: waiting=storage.__waiting
-        except: waiting=storage.__waiting=[]
+    # Only one client at a time can commit a transaction on a
+    # storage.  If one client is committing a transaction, and a
+    # second client sends a tpc_begin(), then second client is queued.
+    # When the first transaction finishes, either by abort or commit,
+    # the request from the queued client must be handled.
+
+    # It is important that this code be robust.  If a queued
+    # transaction is not restarted, the server will stop processing
+    # new transactions.
+
+    # This lock is implemented by storing the queued requests in a
+    # list on the storage object.  The list contains:
+    #     a callable object to resume request
+    #     arguments to that object
+    #     a callable object to handle errors during resume
+
+    # XXX I am not sure that the commitlock_resume() method is
+    # sufficiently paranoid.
+
+    def commitlock_suspend(self, resume, args, onerror):
+        self.__storage._waiting.append((resume, args, onerror))
+
+    def commitlock_resume(self):
+        waiting = self.__storage._waiting
         while waiting:
-            f, args = waiting.pop(0)
-            if apply(f,args): break
+            resume, args, onerror = waiting.pop(0)
+            try:
+                if apply(resume, args):
+                    break
+            except Disconnected:
+                # A disconnected error isn't an unexpected error.
+                # There should be no need to log it, because the
+                # disconnect will have generated its own log event.
+                onerror()
+            except:
+                LOG('ZEO Server', ERROR,
+                    "Unexpected error handling queued tpc_begin()",
+                    error=sys.exc_info())
+                onerror()
 
-        self._transaction=None
-        self.__invalidated=[]
+    def tpc_abort(self, id):
+        t = self._transaction
+        if t is None or id != t.id:
+            return
+        r = self.__storage.tpc_abort(t)
+
+        self._transaction = None
+        self.__invalidated = []
+        self.commitlock_resume()
         
     def unlock(self):
-        if self.__closed: return
+        if self.__closed:
+            return
         self.message_output('UN.')
 
     def tpc_begin(self, id, user, description, ext):
-        t=self._transaction
+        t = self._transaction
         if t is not None:
-            if id == t.id: return
+            if id == t.id:
+                return
             else:
                 raise StorageServerError(
                     "Multiple simultaneous tpc_begin requests from the same "
                     "client."
                     )
-        storage=self.__storage
+        storage = self.__storage
         if storage._transaction is not None:
-            try: waiting=storage.__waiting
-            except: waiting=storage.__waiting=[]
-            waiting.append((self.unlock, ()))
+            self.commitlock_suspend(self.unlock, (), self.close)
             return 1 # Return a flag indicating a lock condition.
             
         self._transaction=t=Transaction()
@@ -552,9 +522,9 @@ class ZEOConnection(SizedMessageAsyncConnection):
         if storage._transaction is None:
             self.try_again_sync(id, user, description, ext)
         else:
-            try: waiting=storage.__waiting
-            except: waiting=storage.__waiting=[]
-            waiting.append((self.try_again_sync, (id, user, description, ext)))
+            self.commitlock_suspend(self.try_again_sync,
+                                    (id, user, description, ext),
+                                    self.close)
 
         return _noreturn
         
@@ -572,24 +542,21 @@ class ZEOConnection(SizedMessageAsyncConnection):
         return 1
 
     def tpc_finish(self, id, user, description, ext):
-        t=self._transaction
-        if id != t.id: return
+        t = self._transaction
+        if id != t.id:
+            return
 
-        storage=self.__storage
-        r=storage.tpc_finish(t)
-        
-        try: waiting=storage.__waiting
-        except: waiting=storage.__waiting=[]
-        while waiting:
-            f, args = waiting.pop(0)
-            if apply(f,args): break
+        storage = self.__storage
+        r = storage.tpc_finish(t)
 
-        self._transaction=None
+        self._transaction = None
         if self.__invalidated:
             self.__server.invalidate(self, self.__storage_id,
                                      self.__invalidated,
                                      self.get_size_info())
-            self.__invalidated=[]
+            self.__invalidated = []
+            
+        self.commitlock_resume()
 
 def init_storage(storage):
     if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None
diff --git a/src/ZEO/asyncwrap.py b/src/ZEO/asyncwrap.py
index b8c065d2..e50c5359 100644
--- a/src/ZEO/asyncwrap.py
+++ b/src/ZEO/asyncwrap.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """A wrapper for asyncore that provides robust exception handling.
 
 The poll() and loop() calls exported by asyncore can raise exceptions.
@@ -32,6 +31,10 @@ it would be useful to extend this module with wrappers for those
 errors.
 """
 
+# XXX The current implementation requires Python 2.0.  Not sure if
+# that's acceptable, depends on how many users want to combine ZEO 1.0
+# and Zope 2.3.
+
 import asyncore
 import errno
 import select
diff --git a/src/ZEO/smac.py b/src/ZEO/smac.py
index c4dcfbd9..e75e5145 100644
--- a/src/ZEO/smac.py
+++ b/src/ZEO/smac.py
@@ -14,7 +14,7 @@
 """Sized message async connections
 """
 
-__version__ = "$Revision: 1.14 $"[11:-2]
+__version__ = "$Revision: 1.15 $"[11:-2]
 
 import asyncore, string, struct, zLOG, sys, Acquisition
 import socket, errno
diff --git a/src/ZEO/start.py b/src/ZEO/start.py
index 692a8a2a..4b560174 100644
--- a/src/ZEO/start.py
+++ b/src/ZEO/start.py
@@ -15,7 +15,7 @@
 """Start the server storage.
 """
 
-__version__ = "$Revision: 1.29 $"[11:-2]
+__version__ = "$Revision: 1.30 $"[11:-2]
 
 import sys, os, getopt, string
 
@@ -264,10 +264,7 @@ def rotate_logs():
         zLOG.log_write.reinitialize()
     else:
         # Hm, lets at least try to take care of the stupid logger:
-        if hasattr(zLOG, '_set_stupid_dest'):
-            zLOG._set_stupid_dest(None)
-        else:
-            zLOG._stupid_dest = None
+        zLOG._stupid_dest=None
 
 def rotate_logs_handler(signum, frame):
     rotate_logs()
diff --git a/src/ZEO/tests/Cache.py b/src/ZEO/tests/Cache.py
index 873586a0..2c86d5b7 100644
--- a/src/ZEO/tests/Cache.py
+++ b/src/ZEO/tests/Cache.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """Tests of the ZEO cache"""
 
 from ZODB.Transaction import Transaction
diff --git a/src/ZEO/tests/__init__.py b/src/ZEO/tests/__init__.py
index e69de29b..682d951d 100644
--- a/src/ZEO/tests/__init__.py
+++ b/src/ZEO/tests/__init__.py
@@ -0,0 +1,13 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
diff --git a/src/ZEO/tests/forker.py b/src/ZEO/tests/forker.py
index 9716e625..e746b7a7 100644
--- a/src/ZEO/tests/forker.py
+++ b/src/ZEO/tests/forker.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """Library for forking storage server and connecting client storage"""
 
 import asyncore
@@ -62,7 +61,7 @@ if os.name == "nt":
         args = (sys.executable, script, str(port), storage_name) + args
         d = os.environ.copy()
         d['PYTHONPATH'] = os.pathsep.join(sys.path)
-        pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d)
+        pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
         return ('localhost', port), ('localhost', port + 1), pid
 
 else:
@@ -98,16 +97,14 @@ else:
         rd, wr = os.pipe()
         pid = os.fork()
         if pid == 0:
-            try:
-                if PROFILE:
-                    p = profile.Profile()
-                    p.runctx("run_server(storage, addr, rd, wr)", globals(),
-                             locals())
-                    p.dump_stats("stats.s.%d" % os.getpid())
-                else:
-                    run_server(storage, addr, rd, wr)
-            finally:
-                os._exit(0)
+            if PROFILE:
+                p = profile.Profile()
+                p.runctx("run_server(storage, addr, rd, wr)", globals(),
+                         locals())
+                p.dump_stats("stats.s.%d" % os.getpid())
+            else:
+                run_server(storage, addr, rd, wr)
+            os._exit(0)
         else:
             os.close(rd)
             return pid, ZEOClientExit(wr)
diff --git a/src/ZEO/tests/multi.py b/src/ZEO/tests/multi.py
index 6b323e55..b4dfc408 100644
--- a/src/ZEO/tests/multi.py
+++ b/src/ZEO/tests/multi.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """A multi-client test of the ZEO storage server"""
 
 import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
@@ -70,18 +69,16 @@ def start_server(addr):
 def start_client(addr, client_func=None):
     pid = os.fork()
     if pid == 0:
-        try:
-            import ZEO.ClientStorage
-            if VERBOSE:
-                print "Client process started:", os.getpid()
-            cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
-            if client_func is None:
-                run(cli)
-            else:
-                client_func(cli)
-            cli.close()
-        finally:
-            os._exit(0)
+        import ZEO.ClientStorage
+        if VERBOSE:
+            print "Client process started:", os.getpid()
+        cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
+        if client_func is None:
+            run(cli)
+        else:
+            client_func(cli)
+        cli.close()
+        os._exit(0)
     else:
         return pid
 
diff --git a/src/ZEO/tests/stress.py b/src/ZEO/tests/stress.py
index fb06a3e4..7ebfea97 100644
--- a/src/ZEO/tests/stress.py
+++ b/src/ZEO/tests/stress.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """A ZEO client-server stress test to look for leaks.
 
 The stress test should run in an infinite loop and should involve
@@ -105,34 +104,32 @@ def start_child(zaddr):
     if pid != 0:
         return pid
     
-    try:
-        storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
-        db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
-        setup(db.open())
-        conns = []
-        conn_count = 0
-
-        for i in range(NUM_CONNECTIONS):
+    storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
+    db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
+    setup(db.open())
+    conns = []
+    conn_count = 0
+
+    for i in range(NUM_CONNECTIONS):
+        c = db.open()
+        c.__count = 0
+        conns.append(c)
+        conn_count += 1
+
+    while conn_count < 25:
+        c = random.choice(conns)
+        if c.__count > NUM_TRANSACTIONS_PER_CONN:
+            conns.remove(c)
+            c.close()
+            conn_count += 1
             c = db.open()
             c.__count = 0
             conns.append(c)
-            conn_count += 1
+        else:
+            c.__count += 1
+        work(c)
 
-        while conn_count < 25:
-            c = random.choice(conns)
-            if c.__count > NUM_TRANSACTIONS_PER_CONN:
-                conns.remove(c)
-                c.close()
-                conn_count += 1
-                c = db.open()
-                c.__count = 0
-                conns.append(c)
-            else:
-                c.__count += 1
-            work(c)
-
-    finally:
-        os._exit(0)
+    os._exit(0)
 
 if __name__ == "__main__":
     main()
diff --git a/src/ZEO/tests/testZEO.py b/src/ZEO/tests/testZEO.py
index d6cdc00d..0508781a 100644
--- a/src/ZEO/tests/testZEO.py
+++ b/src/ZEO/tests/testZEO.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """Test suite for ZEO based on ZODB.tests"""
 
 import asyncore
@@ -27,6 +26,8 @@ import unittest
 import ZEO.ClientStorage, ZEO.StorageServer
 import ThreadedAsync, ZEO.trigger
 from ZODB.FileStorage import FileStorage
+from ZODB.TimeStamp import TimeStamp
+from ZODB.Transaction import Transaction
 import thread
 
 from ZEO.tests import forker, Cache
@@ -35,7 +36,7 @@ from ZEO.smac import Disconnected
 # Sorry Jim...
 from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
      TransactionalUndoStorage, TransactionalUndoVersionStorage, \
-     PackableStorage, Synchronization, ConflictResolution, RevisionStorage
+     PackableStorage, Synchronization, ConflictResolution
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle
 
@@ -56,9 +57,63 @@ class PackWaitWrapper:
         self.storage.pack(t, f, wait=1)
 
 class ZEOTestBase(StorageTestBase.StorageTestBase):
-    """Version of the storage test class that supports ZEO."""
-    pass
+    """Version of the storage test class that supports ZEO.
+    
+    For ZEO, we don't always get the serialno/exception for a
+    particular store as the return value from the store.   But we
+    will get no later than the return value from vote.
+    """
     
+    def _dostore(self, oid=None, revid=None, data=None, version=None,
+                 already_pickled=0):
+        """Do a complete storage transaction.
+
+        The defaults are:
+         - oid=None, ask the storage for a new oid
+         - revid=None, use a revid of ZERO
+         - data=None, pickle up some arbitrary data (the integer 7)
+         - version=None, use the empty string version
+        
+        Returns the object's new revision id.
+        """
+        if oid is None:
+            oid = self._storage.new_oid()
+        if revid is None:
+            revid = ZERO
+        if data is None:
+            data = MinPO(7)
+        if not already_pickled:
+            data = StorageTestBase.zodb_pickle(data)
+        if version is None:
+            version = ''
+        # Begin the transaction
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        # Store an object
+        r1 = self._storage.store(oid, revid, data, version, t)
+        s1 = self._get_serial(r1)
+        # Finish the transaction
+        r2 = self._storage.tpc_vote(t)
+        s2 = self._get_serial(r2)
+        self._storage.tpc_finish(t)
+        # s1, s2 can be None or dict
+        assert not (s1 and s2)
+        return s1 and s1[oid] or s2 and s2[oid]
+
+    def _get_serial(self, r):
+        """Return oid -> serialno dict from sequence of ZEO replies."""
+        d = {}
+        if r is None:
+            return None
+        if type(r) == types.StringType:
+            raise RuntimeError, "unexpected ZEO response: no oid"
+        else:
+            for oid, serial in r:
+                if isinstance(serial, Exception):
+                    raise serial
+                d[oid] = serial
+        return d
+
 # Some of the ZEO tests depend on the version of FileStorage available
 # for the tests.  If we run these tests using Zope 2.3, FileStorage
 # doesn't support TransactionalUndo.
@@ -75,14 +130,13 @@ if hasattr(FileStorage, 'supportsTransactionalUndo'):
 else:
     class VersionDependentTests:
         pass
-
+        
 class GenericTests(ZEOTestBase,
                    VersionDependentTests,
                    Cache.StorageWithCache,
                    Cache.TransUndoStorageWithCache,
                    BasicStorage.BasicStorage,
                    VersionStorage.VersionStorage,
-                   RevisionStorage.RevisionStorage,
                    PackableStorage.PackableStorage,
                    Synchronization.SynchronizedStorage,
                    ):
@@ -94,12 +148,16 @@ class GenericTests(ZEOTestBase,
     returns a specific storage, e.g. FileStorage.
     """
 
+    __super_setUp = StorageTestBase.StorageTestBase.setUp
+    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
     def setUp(self):
         """Start a ZEO server using a Unix domain socket
 
         The ZEO server uses the storage object returned by the
         getStorage() method.
         """
+        self.__super_setUp()
         self.running = 1
         client, exit, pid = forker.start_zeo(self.getStorage())
         self._pid = pid
@@ -114,13 +172,68 @@ class GenericTests(ZEOTestBase,
         self._server.close()
         os.waitpid(self._pid, 0)
         self.delStorage()
+        self.__super_tearDown()
+
+    def checkTwoArgBegin(self):
+        # XXX ZEO doesn't support 2-arg begin
+        pass
 
     def checkLargeUpdate(self):
         obj = MinPO("X" * (10 * 128 * 1024))
         self._dostore(data=obj)
 
-    def checkTwoArgBegin(self):
-        pass # ZEO 1 doesn't support two-arg begin
+    def checkCommitLockOnCommit(self):
+        self._checkCommitLock("tpc_finish")
+
+    def checkCommitLockOnAbort(self):
+        self._checkCommitLock("tpc_abort")
+
+    def _checkCommitLock(self, method_name):
+        # check the commit lock when a client attemps a transaction,
+        # but fails/exits before finishing the commit.
+
+        # Start on transaction normally.
+        t = Transaction()
+        self._storage.tpc_begin(t)
+
+        # Start a second transaction on a different connection without
+        # blocking the test thread.
+        self._storages = []
+        for i in range(3):
+            storage2 = self._duplicate_client()
+            t2 = Transaction()
+            tid = self._get_timestamp()
+            storage2._call.sendMessage('tpc_begin_sync',
+                                       tid, t2.user, t2.description,
+                                       t2._extension)
+            if i == 0:
+                storage2.close()
+            else:
+                self._storages.append((storage2, t2))
+
+        oid = self._storage.new_oid()
+        self._storage.store(oid, None, '', '', t)
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+
+        for store, trans in self._storages:
+            store.tpc_abort(trans)
+            store.close()
+
+        # Make sure the server is still responsive
+        self._dostore()
+
+    def _duplicate_client(self):
+        "Open another ClientStorage to the same server."
+        addr = self._storage._connection
+        new = ZEO.ClientStorage.ClientStorage(addr)
+        new.registerDB(DummyDB(), None)
+        return new
+
+    def _get_timestamp(self):
+        t = time.time()
+        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+        return 't'
 
 class ZEOFileStorageTests(GenericTests):
     __super_setUp = GenericTests.setUp
@@ -148,8 +261,11 @@ class WindowsGenericTests(GenericTests):
     can't be created in the parent process and passed to the child.
     All the work has to be done in the server's process.
     """
+    __super_setUp = StorageTestBase.StorageTestBase.setUp
+    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
 
     def setUp(self):
+        self.__super_setUp()
         args = self.getStorageInfo()
         name = args[0]
         args = args[1:]
@@ -169,6 +285,7 @@ class WindowsGenericTests(GenericTests):
 ##        os.waitpid(self.test_pid, 0)
         time.sleep(0.5)
         self.delStorage()
+        self.__super_tearDown()
 
 class WindowsZEOFileStorageTests(WindowsGenericTests):
 
@@ -192,6 +309,8 @@ class ConnectionTests(ZEOTestBase):
     start and stop a ZEO storage server.
     """
     
+    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
     ports = []
     for i in range(200):
         ports.append(random.randrange(25000, 30000))
@@ -207,7 +326,6 @@ class ConnectionTests(ZEOTestBase):
 
     def tearDown(self):
         """Try to cause the tests to halt"""
-        self._storage.close()
         self.shutdownServer()
         # file storage appears to create four files
         for ext in '', '.index', '.lock', '.tmp':
@@ -218,6 +336,7 @@ class ConnectionTests(ZEOTestBase):
             path = "c1-test-%d.zec" % i
             if os.path.exists(path):
                 os.unlink(path)
+        self.__super_tearDown()
 
     def checkBasicPersistence(self):
         """Verify cached data persists across client storage instances.
diff --git a/src/ZEO/tests/winserver.py b/src/ZEO/tests/winserver.py
index b1531e28..3b3bda51 100644
--- a/src/ZEO/tests/winserver.py
+++ b/src/ZEO/tests/winserver.py
@@ -11,7 +11,6 @@
 # FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 """Helper file used to launch ZEO server for Windows tests"""
 
 import asyncore
diff --git a/src/ZEO/zrpc.py b/src/ZEO/zrpc.py
index 8ff2d18b..0825fc4b 100644
--- a/src/ZEO/zrpc.py
+++ b/src/ZEO/zrpc.py
@@ -14,7 +14,7 @@
 """Simple rpc mechanisms
 """
 
-__version__ = "$Revision: 1.21 $"[11:-2]
+__version__ = "$Revision: 1.22 $"[11:-2]
 
 from cPickle import loads
 import cPickle
diff --git a/src/ZEO/zrpc/smac.py b/src/ZEO/zrpc/smac.py
index c4dcfbd9..e75e5145 100644
--- a/src/ZEO/zrpc/smac.py
+++ b/src/ZEO/zrpc/smac.py
@@ -14,7 +14,7 @@
 """Sized message async connections
 """
 
-__version__ = "$Revision: 1.14 $"[11:-2]
+__version__ = "$Revision: 1.15 $"[11:-2]
 
 import asyncore, string, struct, zLOG, sys, Acquisition
 import socket, errno
-- 
2.30.9