diff --git a/NEWS.txt b/NEWS.txt
index 8a40ea821fad61cc9b7e88a491a1d7838e1a6c89..eabc992ca65e37df335b2ce7cb3b3ae760065b54 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -52,9 +52,14 @@ Test suite
 
 Collector #1397: testTimeStamp fails on FreeBSD
 
-The *BSD distributions are unique in that their mktime() implementation
-usually ignores the input tm_isdst value.  Test checkFullTimeStamp()
-was sensitive to this platform quirk.
+    The *BSD distributions are unique in that their mktime()
+    implementation usually ignores the input tm_isdst value.  Test
+    checkFullTimeStamp() was sensitive to this platform quirk.
+
+Reworked the way some of the ZEO tests use threads, so that unittest is
+more likely to notice the real cause of a failure (which usually occurs in
+a thread), and less likely to latch on to spurious problems resulting from
+the real failure.
 
 
 What's new in ZODB3 3.3 beta 1
diff --git a/src/ZEO/tests/CommitLockTests.py b/src/ZEO/tests/CommitLockTests.py
index fb4a1fc3ef156238b0f9b804779696e469261685..512c75ecfbe1770ec771a55223850b77fbf564cb 100644
--- a/src/ZEO/tests/CommitLockTests.py
+++ b/src/ZEO/tests/CommitLockTests.py
@@ -35,12 +35,12 @@ class WorkerThread(TestThread):
     # run the entire test in a thread so that the blocking call for
     # tpc_vote() doesn't hang the test suite.
 
-    def __init__(self, testcase, storage, trans, method="tpc_finish"):
+    def __init__(self, storage, trans, method="tpc_finish"):
         self.storage = storage
         self.trans = trans
         self.method = method
         self.ready = threading.Event()
-        TestThread.__init__(self, testcase)
+        TestThread.__init__(self)
 
     def testrun(self):
         try:
@@ -115,7 +115,7 @@ class CommitLockTests:
             txn = transaction.Transaction()
             tid = self._get_timestamp()
 
-            t = WorkerThread(self, storage, txn)
+            t = WorkerThread(storage, txn)
             self._threads.append(t)
             t.start()
             t.ready.wait()
diff --git a/src/ZEO/tests/InvalidationTests.py b/src/ZEO/tests/InvalidationTests.py
index 6fca9b32539b7c5a80a12d0fd88144cbb5c0722b..c6e98096da6929b4923208d7d13a06e44057e7dc 100644
--- a/src/ZEO/tests/InvalidationTests.py
+++ b/src/ZEO/tests/InvalidationTests.py
@@ -49,6 +49,7 @@ class FailableThread(TestThread):
     # - self.stop attribute (an event)
     # - self._testrun() method
 
+    # TestThread.run() invokes testrun().
     def testrun(self):
         try:
             self._testrun()
@@ -64,8 +65,7 @@ class StressTask:
     # to 'tree'.  If sleep is given, sleep
     # that long after each append.  At the end, instance var .added_keys
     # is a list of the ints the thread believes it added successfully.
-    def __init__(self, testcase, db, threadnum, startnum,
-                 step=2, sleep=None):
+    def __init__(self, db, threadnum, startnum, step=2, sleep=None):
         self.db = db
         self.threadnum = threadnum
         self.startnum = startnum
@@ -132,9 +132,9 @@ class StressThread(FailableThread):
     # to 'tree' until Event stop is set.  If sleep is given, sleep
     # that long after each append.  At the end, instance var .added_keys
     # is a list of the ints the thread believes it added successfully.
-    def __init__(self, testcase, db, stop, threadnum, commitdict,
+    def __init__(self, db, stop, threadnum, commitdict,
                  startnum, step=2, sleep=None):
-        TestThread.__init__(self, testcase)
+        TestThread.__init__(self)
         self.db = db
         self.stop = stop
         self.threadnum = threadnum
@@ -180,9 +180,9 @@ class LargeUpdatesThread(FailableThread):
     # more than 25 objects so that it can test code that runs vote
     # in a separate thread when it modifies more than 25 objects.
 
-    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+    def __init__(self, db, stop, threadnum, commitdict, startnum,
                  step=2, sleep=None):
-        TestThread.__init__(self, testcase)
+        TestThread.__init__(self)
         self.db = db
         self.stop = stop
         self.threadnum = threadnum
@@ -192,15 +192,6 @@ class LargeUpdatesThread(FailableThread):
         self.added_keys = []
         self.commitdict = commitdict
 
-    def testrun(self):
-        try:
-            self._testrun()
-        except:
-            # Report the failure here to all the other threads, so
-            # that they stop quickly.
-            self.stop.set()
-            raise
-
     def _testrun(self):
         cn = self.db.open()
         while not self.stop.isSet():
@@ -260,9 +251,9 @@ class LargeUpdatesThread(FailableThread):
 
 class VersionStressThread(FailableThread):
 
-    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+    def __init__(self, db, stop, threadnum, commitdict, startnum,
                  step=2, sleep=None):
-        TestThread.__init__(self, testcase)
+        TestThread.__init__(self)
         self.db = db
         self.stop = stop
         self.threadnum = threadnum
@@ -272,15 +263,6 @@ class VersionStressThread(FailableThread):
         self.added_keys = []
         self.commitdict = commitdict
 
-    def testrun(self):
-        try:
-            self._testrun()
-        except:
-            # Report the failure here to all the other threads, so
-            # that they stop quickly.
-            self.stop.set()
-            raise
-
     def _testrun(self):
         commit = 0
         key = self.startnum
@@ -416,6 +398,13 @@ class InvalidationTests:
                 break
             # Some thread still hasn't managed to commit anything.
         stop.set()
+        # Give all the threads some time to stop before trying to clean up.
+        # cleanup() will cause the test to fail if some thread ended with
+        # an uncaught exception, and unittest will call the base class
+        # tearDown then immediately, but if other threads are still
+        # running that can lead to a cascade of spurious exceptions.
+        for t in threads:
+            t.join(10)
         for t in threads:
             t.cleanup()
 
@@ -432,8 +421,8 @@ class InvalidationTests:
         time.sleep(0.1)
 
         # Run two threads that update the BTree
-        t1 = StressTask(self, db1, 1, 1,)
-        t2 = StressTask(self, db2, 2, 2,)
+        t1 = StressTask(db1, 1, 1,)
+        t2 = StressTask(db2, 2, 2,)
         _runTasks(100, t1, t2)
 
         cn.sync()
@@ -458,8 +447,8 @@ class InvalidationTests:
 
         # Run two threads that update the BTree
         cd = {}
-        t1 = self.StressThread(self, db1, stop, 1, cd, 1)
-        t2 = self.StressThread(self, db2, stop, 2, cd, 2)
+        t1 = self.StressThread(db1, stop, 1, cd, 1)
+        t2 = self.StressThread(db2, stop, 2, cd, 2)
         self.go(stop, cd, t1, t2)
 
         while db1.lastTransaction() != db2.lastTransaction():
@@ -487,8 +476,8 @@ class InvalidationTests:
 
         # Run two threads that update the BTree
         cd = {}
-        t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.01)
-        t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.01)
+        t1 = self.StressThread(db1, stop, 1, cd, 1, sleep=0.01)
+        t2 = self.StressThread(db1, stop, 2, cd, 2, sleep=0.01)
         self.go(stop, cd, t1, t2)
 
         cn = db1.open()
@@ -516,9 +505,9 @@ class InvalidationTests:
         # at the same time.
 
         cd = {}
-        t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
-        t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.01)
-        t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.01)
+        t1 = self.StressThread(db1, stop, 1, cd, 1, 3)
+        t2 = self.StressThread(db2, stop, 2, cd, 2, 3, 0.01)
+        t3 = self.StressThread(db2, stop, 3, cd, 3, 3, 0.01)
         self.go(stop, cd, t1, t2, t3)
 
         while db1.lastTransaction() != db2.lastTransaction():
@@ -552,9 +541,9 @@ class InvalidationTests:
         # at the same time.
 
         cd = {}
-        t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
-        t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.01)
-        t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.01)
+        t1 = VersionStressThread(db1, stop, 1, cd, 1, 3)
+        t2 = VersionStressThread(db2, stop, 2, cd, 2, 3, 0.01)
+        t3 = VersionStressThread(db2, stop, 3, cd, 3, 3, 0.01)
         self.go(stop, cd, t1, t2, t3)
 
         while db1.lastTransaction() != db2.lastTransaction():
@@ -591,9 +580,9 @@ class InvalidationTests:
         # at the same time.
 
         cd = {}
-        t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.02)
-        t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.01)
-        t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.01)
+        t1 = LargeUpdatesThread(db1, stop, 1, cd, 1, 3, 0.02)
+        t2 = LargeUpdatesThread(db2, stop, 2, cd, 2, 3, 0.01)
+        t3 = LargeUpdatesThread(db2, stop, 3, cd, 3, 3, 0.01)
         self.go(stop, cd, t1, t2, t3)
 
         while db1.lastTransaction() != db2.lastTransaction():
diff --git a/src/ZEO/tests/TestThread.py b/src/ZEO/tests/TestThread.py
index a3d2f90e7c8c1688e295e9e55d33480ae66d4fe3..9b3aa6f5bc80fd52e8fcae13474d0a37d259a3ac 100644
--- a/src/ZEO/tests/TestThread.py
+++ b/src/ZEO/tests/TestThread.py
@@ -13,30 +13,44 @@
 ##############################################################################
 """A Thread base class for use with unittest."""
 
-from cStringIO import StringIO
 import threading
-import traceback
+import sys
 
 class TestThread(threading.Thread):
-    __super_init = threading.Thread.__init__
-    __super_run = threading.Thread.run
+    """Base class for defining threads that run from unittest.
 
-    def __init__(self, testcase, group=None, target=None, name=None,
-                 args=(), kwargs={}, verbose=None):
-        self.__super_init(group, target, name, args, kwargs, verbose)
+    The subclass should define a testrun() method instead of a run()
+    method.
+
+    Call cleanup() when the test is done with the thread, instead of join().
+    If the thread exits with an uncaught exception, it's captured and
+    re-raised when cleanup() is called.  cleanup() should be called by
+    the main thread!  Trying to tell unittest that a test failed from
+    another thread creates a nightmare of timing-depending cascading
+    failures and missed errors (tracebacks that show up on the screen,
+    but don't cause unittest to believe the test failed).
+
+    cleanup() also joins the thread.  If the thread ended without raising
+    an uncaught exception, and the join doesn't succeed in the timeout
+    period, then the test is made to fail with a "Thread still alive"
+    message.
+    """
+
+    def __init__(self):
+        threading.Thread.__init__(self)
+        # In case this thread hangs, don't stop Python from exiting.
         self.setDaemon(1)
-        self._testcase = testcase
+        self._exc_info = None
 
     def run(self):
         try:
             self.testrun()
-        except Exception:
-            s = StringIO()
-            traceback.print_exc(file=s)
-            self._testcase.fail("Exception in thread %s:\n%s\n" %
-                                (self, s.getvalue()))
+        except:
+            self._exc_info = sys.exc_info()
 
     def cleanup(self, timeout=15):
         self.join(timeout)
+        if self._exc_info:
+            raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
         if self.isAlive():
             self._testcase.fail("Thread did not finish: %s" % self)