diff --git a/src/CHANGES.txt b/src/CHANGES.txt
index e91ee8e4bcead38841c796296f54ce684b657833..a95d49df5cce93c617d3f5fa4e4e7e8b945e6af9 100644
--- a/src/CHANGES.txt
+++ b/src/CHANGES.txt
@@ -39,7 +39,8 @@ New Features
 
 - The previous (ZODB 3.8) ZEO client-cache format is supported.
   The newer cache format introduced in ZODB 3.9.0a1 is no-longer
-  supported. Cache files can still be larger than 4G.
+  supported. Cache files can still be larger than 4G.  Cache file
+  sizes can now be changed.
 
 3.9.0a4 (2008-11-06)
 ====================
diff --git a/src/ZEO/cache.py b/src/ZEO/cache.py
index f7a9aacacf03b651c24c5d336d5e8cda98da7dc2..c8a56478bc0779d28821e7d4270bf98e7a739c65 100644
--- a/src/ZEO/cache.py
+++ b/src/ZEO/cache.py
@@ -88,6 +88,7 @@ ZEC_HEADER_SIZE = 12
 # while opening.
 max_block_size = (1<<31) - 1
 
+
 # After the header, the file contains a contiguous sequence of blocks.  All
 # blocks begin with a one-byte status indicator:
 #
@@ -116,6 +117,8 @@ max_block_size = (1<<31) - 1
 #     2 byte version length must be 0
 #     4 byte data size
 #     data
+#     8 byte redundant oid for error detection.
+allocated_record_overhead = 43
 
 # The cache's currentofs goes around the file, circularly, forever.
 # It's always the starting offset of some block.
@@ -187,35 +190,25 @@ class ClientCache(object):
         # here -- the scan() method must be called then to open the file
         # (and it sets self.f).
 
+        fsize = ZEC_HEADER_SIZE
         if path:
             self._lock_file = zc.lockfile.LockFile(path + '.lock')
-        
-        if path and os.path.exists(path):
-            # Reuse an existing file.  scan() will open & read it.
-            self.f = None
-            logger.info("reusing persistent cache file %r", path)
-        else:
-            if path:
+            if not os.path.exists(path):
+                # Create a small empty file.  We'll make it bigger in _initfile.
                 self.f = open(path, 'wb+')
+                self.f.write(magic+z64)
                 logger.info("created persistent cache file %r", path)
             else:
-                self.f = tempfile.TemporaryFile()
-                logger.info("created temporary cache file %r", self.f.name)
-            # Make sure the OS really saves enough bytes for the file.
-            self.f.seek(self.maxsize - 1)
-            self.f.write('x')
-            self.f.truncate()
-            # Start with one magic header block
-            self.f.seek(0)
-            self.f.write(magic)
-            self.f.write(z64)
-            # add as many free blocks as are needed to fill the space
-            nfree = self.maxsize - ZEC_HEADER_SIZE
-            for i in range(0, nfree, max_block_size):
-                block_size = min(max_block_size, nfree-i)
-                self.f.write('f' + pack(">I", block_size))
-                self.f.seek(block_size-5, 1)
-            sync(self.f)
+                fsize = os.path.getsize(self.path)
+                self.f = open(path, 'rb+')
+                logger.info("reusing persistent cache file %r", path)
+        else:
+            # Create a small empty file.  We'll make it bigger in _initfile.
+            self.f = tempfile.TemporaryFile()
+            self.f.write(magic+z64)
+            logger.info("created temporary cache file %r", self.f.name)
+            
+        self._initfile(self.f, fsize)
 
         # Statistics:  _n_adds, _n_added_bytes,
         #              _n_evicts, _n_evicted_bytes,
@@ -224,8 +217,6 @@ class ClientCache(object):
 
         self._setup_trace(path)
 
-        self.open()
-
         self._lock = threading.RLock()
 
     # Backward compatibility. Client code used to have to use the fc
@@ -238,20 +229,13 @@ class ClientCache(object):
     # Scan the current contents of the cache file, calling `install`
     # for each object found in the cache.  This method should only
     # be called once to initialize the cache from disk.
-    def open(self):
-        if self.f is not None:  # we're not (re)using a pre-existing file
-            return
-        fsize = os.path.getsize(self.path)
-        if fsize != self.maxsize:
-            logger.warning("existing cache file %r has size %d; "
-                           "requested size %d ignored", self.path,
-                           fsize, self.maxsize)
-            self.maxsize = fsize
-        self.f = open(self.path, 'rb+')
-        read = self.f.read
-        seek = self.f.seek
-        _magic = read(4)
-        if _magic != magic:
+    def _initfile(self, f, fsize):
+        maxsize = self.maxsize
+        read = f.read
+        seek = f.seek
+        write = f.write
+        seek(0)
+        if read(4) != magic:
             raise ValueError("unexpected magic number: %r" % _magic)
         self.tid = read(8)
         if len(self.tid) != 8:
@@ -264,8 +248,9 @@ class ClientCache(object):
 
         self.current = ZODB.fsIndex.fsIndex()
         self.noncurrent = BTrees.LOBTree.LOBTree()
-        max_free_size = l = 0
-        ofs = max_free_offset = ZEC_HEADER_SIZE
+        l = 0
+        ofs = ZEC_HEADER_SIZE
+        first_free_offset = 0
         current = self.current
         while ofs < fsize:
             seek(ofs)
@@ -273,35 +258,77 @@ class ClientCache(object):
             if status == 'a':
                 size, oid, start_tid, end_tid, lver = unpack(
                     ">I8s8s8sH", read(30))
-                if end_tid == z64:
-                    assert oid not in current, (ofs, self.f.tell())
-                    current[oid] = ofs
+                if ofs+size <= maxsize:
+                    if end_tid == z64:
+                        assert oid not in current, (ofs, f.tell())
+                        current[oid] = ofs
+                    else:
+                        assert start_tid < end_tid, (ofs, f.tell())
+                        self._set_noncurrent(oid, start_tid, ofs)
+                    assert lver == 0, "Versions aren't supported"
+                    l += 1
+            else:
+                # free block
+                if first_free_offset == 0:
+                    first_free_offset = ofs
+                if status == 'f':
+                    size, = unpack(">I", read(4))
+                    if size > max_block_size:
+                        # Oops, we either have an old cache, or a we
+                        # crashed while storing. Split this block into two.
+                        assert size <= max_block_size*2
+                        seek(ofs+max_block_size)
+                        write('f'+pack(">I", size-max_block_size))
+                        seek(ofs)
+                        write('f'+pack(">I", max_block_size))
+                        sync(f)
+                elif status in '1234':
+                    size = int(status)
                 else:
-                    assert start_tid < end_tid, (ofs, self.f.tell())
-                    self._set_noncurrent(oid, start_tid, ofs)
-                assert lver == 0, "Versions aren't supported"
-                l += 1
-            elif status == 'f':
-                size, = unpack(">I", read(4))
-                if size > max_block_size:
-                    # Oops, we either have an old cache, or a we
-                    # crashed while storing. Split this block into two.
-                    assert size <= max_block_size*2
-                    seek(ofs+max_block_size)
-                    self.f.write('f'+pack(">I", size-max_block_size))
+                    raise ValueError("unknown status byte value %s in client "
+                                     "cache file" % 0, hex(ord(status)))
+
+            if ofs + size >= maxsize:
+                # Oops, the file was bigger before.
+                if ofs+size > maxsize:
+                    # The last record is too big. Replace it with a smaller
+                    # free record
+                    size = maxsize-ofs
                     seek(ofs)
-                    self.f.write('f'+pack(">I", max_block_size))
-            elif status in '1234':
-                size = int(status)
-            else:
-                raise ValueError("unknown status byte value %s in client "
-                                 "cache file" % 0, hex(ord(status)))
+                    if size > 4:
+                        write('f'+pack(">I", size))
+                    else:
+                        write("012345"[size])
+                    sync(f)
+                ofs += size
+                break
+
             ofs += size
 
-        if ofs != fsize:
-            raise ValueError("final offset %s != file size %s in client "
-                             "cache file" % (ofs, fsize))
-        self.currentofs = max_free_offset
+        if fsize < maxsize:
+            assert ofs==fsize
+            # Make sure the OS really saves enough bytes for the file.
+            seek(self.maxsize - 1)
+            write('x')
+
+            # add as many free blocks as are needed to fill the space
+            seek(ofs)
+            nfree = maxsize - ZEC_HEADER_SIZE
+            for i in range(0, nfree, max_block_size):
+                block_size = min(max_block_size, nfree-i)
+                write('f' + pack(">I", block_size))
+                seek(block_size-5, 1)
+            sync(self.f)
+            first_free_offset = ofs
+        else:
+            assert ofs==maxsize
+            if maxsize < fsize:
+                seek(maxsize)
+                f.truncate()
+
+        # We use the first_free_offset because it is most likelyt the
+        # place where we last wrote.
+        self.currentofs = first_free_offset or ZEC_HEADER_SIZE
         self._len = l
 
     def _set_noncurrent(self, oid, tid, ofs):
@@ -518,7 +545,7 @@ class ClientCache(object):
             if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid):
                 return
 
-        size = 43 + len(data)
+        size = allocated_record_overhead + len(data)
 
         # A number of cache simulation experiments all concluded that the
         # 2nd-level ZEO cache got a much higher hit rate if "very large"
diff --git a/src/ZEO/tests/test_cache.py b/src/ZEO/tests/test_cache.py
index 5f0939279bacfd855b0801e0a58d68cdf44ac046..9ae1ad9f9cb010394f6bd1ea7c88e3484e7c6351 100644
--- a/src/ZEO/tests/test_cache.py
+++ b/src/ZEO/tests/test_cache.py
@@ -134,7 +134,7 @@ class CacheTests(ZODB.tests.util.TestCase):
             n = p64(i)
             cache.store(n, n, None, data[i])
             self.assertEquals(len(cache), i + 1)
-        # The cache now uses 3287 bytes.  The next insert
+        # The cache is now almost full.  The next insert
         # should delete some objects.
         n = p64(50)
         cache.store(n, n, None, data[51])
@@ -197,10 +197,10 @@ class CacheTests(ZODB.tests.util.TestCase):
         self.assert_(1 not in cache.noncurrent)
 
     def testVeryLargeCaches(self):
-        cache = ZEO.cache.ClientCache('cache', size=(1<<33))
+        cache = ZEO.cache.ClientCache('cache', size=(1<<32)+(1<<20))
         cache.store(n1, n2, None, "x")
         cache.close()
-        cache = ZEO.cache.ClientCache('cache', size=(1<<33))
+        cache = ZEO.cache.ClientCache('cache', size=(1<<33)+(1<<20))
         self.assertEquals(cache.load(n1), ('x', n2))
         cache.close()
 
@@ -224,6 +224,77 @@ class CacheTests(ZODB.tests.util.TestCase):
                           ZEO.cache.max_block_size)
         f.close()
         
+    def testChangingCacheSize(self):
+        # start with a small cache
+        data = 'x'
+        recsize = ZEO.cache.allocated_record_overhead+len(data)
+
+        for extra in (0, 2, recsize-2):
+
+            cache = ZEO.cache.ClientCache(
+                'cache', size=ZEO.cache.ZEC_HEADER_SIZE+100*recsize+extra)
+            for i in range(100):
+                cache.store(p64(i), n1, None, data)
+            self.assertEquals(len(cache), 100)
+            self.assertEquals(os.path.getsize(
+                'cache'), ZEO.cache.ZEC_HEADER_SIZE+100*recsize+extra)
+
+            # Now make it smaller
+            cache.close()
+            small = 50
+            cache = ZEO.cache.ClientCache(
+                'cache', size=ZEO.cache.ZEC_HEADER_SIZE+small*recsize+extra)
+            self.assertEquals(len(cache), small)
+            self.assertEquals(os.path.getsize(
+                'cache'), ZEO.cache.ZEC_HEADER_SIZE+small*recsize+extra)
+            self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
+                              set(range(small)))
+            for i in range(100, 110):
+                cache.store(p64(i), n1, None, data)
+            self.assertEquals(len(cache), small)
+            expected_oids = set(range(10, 50)+range(100, 110))
+            self.assertEquals(
+                set(u64(oid) for (oid, tid) in cache.contents()),
+                expected_oids)
+
+            # Make sure we can reopen with same size
+            cache.close()
+            cache = ZEO.cache.ClientCache(
+                'cache', size=ZEO.cache.ZEC_HEADER_SIZE+small*recsize+extra)
+            self.assertEquals(len(cache), small)
+            self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
+                              expected_oids)
+
+            # Now make it bigger
+            cache.close()
+            large = 150
+            cache = ZEO.cache.ClientCache(
+                'cache', size=ZEO.cache.ZEC_HEADER_SIZE+large*recsize+extra)
+            self.assertEquals(len(cache), small)
+            self.assertEquals(os.path.getsize(
+                'cache'), ZEO.cache.ZEC_HEADER_SIZE+large*recsize+extra)
+            self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
+                              expected_oids)
+
+            for i in range(200, 305):
+                cache.store(p64(i), n1, None, data)
+            self.assertEquals(len(cache), large)
+            expected_oids = set(range(10, 50)+range(105, 110)+range(200, 305))
+            self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
+                              expected_oids)
+
+            # Make sure we can reopen with same size
+            cache.close()
+            cache = ZEO.cache.ClientCache(
+                'cache', size=ZEO.cache.ZEC_HEADER_SIZE+large*recsize+extra)
+            self.assertEquals(len(cache), large)
+            self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
+                              expected_oids)
+
+            # Cleanup
+            cache.close()
+            os.remove('cache')
+        
 
 __test__ = dict(
     kill_does_not_cause_cache_corruption =