Commit d628565b authored by Jim Fulton's avatar Jim Fulton

Improved zeo blob cache clean up to make it a bit more robust and to

avoid spurious test failures.
parent bee97cf1
...@@ -14,6 +14,8 @@ Bugs Fixed ...@@ -14,6 +14,8 @@ Bugs Fixed
- Fixed analyze.py and added test. - Fixed analyze.py and added test.
- ZEO client blob cache size management is a little bit more robust.
3.9.0b1 (2009-05-04) 3.9.0b1 (2009-05-04)
==================== ====================
......
...@@ -38,6 +38,7 @@ import socket ...@@ -38,6 +38,7 @@ import socket
import stat import stat
import sys import sys
import tempfile import tempfile
import thread
import threading import threading
import time import time
import types import types
...@@ -398,6 +399,7 @@ class ClientStorage(object): ...@@ -398,6 +399,7 @@ class ClientStorage(object):
self._blob_cache_size = blob_cache_size self._blob_cache_size = blob_cache_size
self._blob_data_bytes_loaded = 0 self._blob_data_bytes_loaded = 0
if blob_cache_size is not None: if blob_cache_size is not None:
assert blob_cache_size_check < 100
self._blob_cache_size_check = ( self._blob_cache_size_check = (
blob_cache_size * blob_cache_size_check / 100) blob_cache_size * blob_cache_size_check / 100)
self._check_blob_size() self._check_blob_size()
...@@ -477,7 +479,7 @@ class ClientStorage(object): ...@@ -477,7 +479,7 @@ class ClientStorage(object):
check_blob_size_thread = threading.Thread( check_blob_size_thread = threading.Thread(
target=_check_blob_cache_size, target=_check_blob_cache_size,
args=(self.blob_dir, self._blob_cache_size), args=(self.blob_dir, target),
) )
check_blob_size_thread.setDaemon(True) check_blob_size_thread.setDaemon(True)
check_blob_size_thread.start() check_blob_size_thread.start()
...@@ -1620,7 +1622,6 @@ cache_file_name = re.compile(r'\d+$').match ...@@ -1620,7 +1622,6 @@ cache_file_name = re.compile(r'\d+$').match
def _check_blob_cache_size(blob_dir, target): def _check_blob_cache_size(blob_dir, target):
logger = logging.getLogger(__name__+'.check_blob_cache') logger = logging.getLogger(__name__+'.check_blob_cache')
logger.info("Checking blob cache size")
layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER) layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
).read().strip() ).read().strip()
...@@ -1628,18 +1629,31 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1628,18 +1629,31 @@ def _check_blob_cache_size(blob_dir, target):
logger.critical("Invalid blob directory layout %s", layout) logger.critical("Invalid blob directory layout %s", layout)
raise ValueError("Invalid blob directory layout", layout) raise ValueError("Invalid blob directory layout", layout)
attempt_path = os.path.join(blob_dir, 'check_size.attempt')
try:
check_lock = zc.lockfile.LockFile(
os.path.join(blob_dir, 'check_size.lock'))
except zc.lockfile.LockError:
try: try:
time.sleep(1)
check_lock = zc.lockfile.LockFile( check_lock = zc.lockfile.LockFile(
os.path.join(blob_dir, 'check_size.lock')) os.path.join(blob_dir, 'check_size.lock'))
except zc.lockfile.LockError: except zc.lockfile.LockError:
# Someone is already cleaning up, so don't bother # Someone is already cleaning up, so don't bother
logger.info("Another thread is checking the blob cache size") logger.debug("%s Another thread is checking the blob cache size.",
thread.get_ident())
open(attempt_path, 'w').close() # Mark that we tried
return return
logger.debug("%s Checking blob cache size. (target: %s)",
thread.get_ident(), target)
try: try:
while 1:
size = 0 size = 0
blob_suffix = ZODB.blob.BLOB_SUFFIX blob_suffix = ZODB.blob.BLOB_SUFFIX
files_by_atime = BTrees.IOBTree.BTree() files_by_atime = BTrees.OOBTree.BTree()
for dirname in os.listdir(blob_dir): for dirname in os.listdir(blob_dir):
if not cache_file_name(dirname): if not cache_file_name(dirname):
...@@ -1650,26 +1664,35 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1650,26 +1664,35 @@ def _check_blob_cache_size(blob_dir, target):
for file_name in os.listdir(base): for file_name in os.listdir(base):
if not file_name.endswith(blob_suffix): if not file_name.endswith(blob_suffix):
continue continue
file_name = os.path.join(base, file_name) file_path = os.path.join(base, file_name)
if not os.path.isfile(file_name): if not os.path.isfile(file_path):
continue continue
stat = os.stat(file_name) stat = os.stat(file_path)
size += stat.st_size size += stat.st_size
t = int(stat.st_atime) t = stat.st_atime
if t not in files_by_atime: if t not in files_by_atime:
files_by_atime[t] = [] files_by_atime[t] = []
files_by_atime[t].append(file_name) files_by_atime[t].append(os.path.join(dirname, file_name))
logger.debug("%s blob cache size: %s", thread.get_ident(), size)
logger.info("blob cache size: %s", size) if size <= target:
if os.path.isfile(attempt_path):
os.remove(attempt_path)
continue
logger.debug("%s -->", thread.get_ident())
break
while size > target and files_by_atime: while size > target and files_by_atime:
for file_name in files_by_atime.pop(files_by_atime.minKey()): for file_name in files_by_atime.pop(files_by_atime.minKey()):
file_name = os.path.join(blob_dir, file_name)
lockfilename = os.path.join(os.path.dirname(file_name), lockfilename = os.path.join(os.path.dirname(file_name),
'.lock') '.lock')
try: try:
lock = zc.lockfile.LockFile(lockfilename) lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError: except zc.lockfile.LockError:
logger.info("Skipping locked %s", logger.debug("%s Skipping locked %s",
thread.get_ident(),
os.path.basename(file_name)) os.path.basename(file_name))
continue # In use, skip continue # In use, skip
...@@ -1684,7 +1707,11 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1684,7 +1707,11 @@ def _check_blob_cache_size(blob_dir, target):
finally: finally:
lock.close() lock.close()
logger.info("reduced blob cache size: %s", size) if size <= target:
break
logger.debug("%s reduced blob cache size: %s",
thread.get_ident(), size)
finally: finally:
check_lock.close() check_lock.close()
......
...@@ -33,6 +33,11 @@ from the server before the cache size is checked. The ...@@ -33,6 +33,11 @@ from the server before the cache size is checked. The
blob_cache_size_check option defaults to 100. We passed 10, to check blob_cache_size_check option defaults to 100. We passed 10, to check
after writing 10% of the target size. after writing 10% of the target size.
.. We're going to wait for any threads we started to finish, so...
>>> import threading
>>> old_threads = list(threading.enumerate())
We want to check for name collections in the blob cache dir. We'll try We want to check for name collections in the blob cache dir. We'll try
to provoke name collections by reducing the number of cache directory to provoke name collections by reducing the number of cache directory
subdirectories. subdirectories.
...@@ -67,10 +72,13 @@ directory. ...@@ -67,10 +72,13 @@ directory.
... raise ... raise
... return size ... return size
>>> db.storage._check_blob_size_thread.join() >>> def check():
... return cache_size('blobs') < 5000
>>> def onfail():
... return cache_size('blobs')
>>> cache_size('blobs') < 5000 >>> from ZEO.tests.forker import wait_until
True >>> wait_until("size is reduced", check, 99, onfail)
If we read all of the blobs, data will be downloaded again, as If we read all of the blobs, data will be downloaded again, as
necessary, but the cache size will remain not much bigger than the necessary, but the cache size will remain not much bigger than the
...@@ -81,37 +89,26 @@ target: ...@@ -81,37 +89,26 @@ target:
... if data != chr(i)*100: ... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data` ... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join() >>> wait_until("size is reduced", check, 99, onfail)
>>> cache_size('blobs') < 5000
True
>>> for i in range(1, 101): >>> for i in range(1, 101):
... data = conn.root()[i].open().read() ... data = conn.root()[i].open().read()
... if data != chr(i)*100: ... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data` ... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
>>> for i in range(1, 101): >>> for i in range(1, 101):
... data = conn.root()[i].open('c').read() ... data = conn.root()[i].open('c').read()
... if data != chr(i)*100: ... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data` ... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join() >>> wait_until("size is reduced", check, 99, onfail)
>>> cache_size('blobs') < 5000
True
>>> for i in range(1, 101): >>> for i in range(1, 101):
... data = open(conn.root()[i].committed(), 'rb').read() ... data = open(conn.root()[i].committed(), 'rb').read()
... if data != chr(i)*100: ... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data` ... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join() >>> wait_until("size is reduced", check, 99, onfail)
>>> cache_size('blobs') < 5000
True
Now let see if we can stress things a bit. We'll create many clients Now let see if we can stress things a bit. We'll create many clients
and get them to pound on the blobs all at once to see if we can and get them to pound on the blobs all at once to see if we can
...@@ -131,7 +128,6 @@ provoke problems: ...@@ -131,7 +128,6 @@ provoke problems:
... data = conn.root()[i].open('c').read() ... data = conn.root()[i].open('c').read()
... if data != chr(i)*100: ... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data` ... print 'bad data', `chr(i)`, `data`
... db._storage._check_blob_size_thread.join()
... db.close() ... db.close()
>>> threads = [threading.Thread(target=run) for i in range(10)] >>> threads = [threading.Thread(target=run) for i in range(10)]
...@@ -140,12 +136,18 @@ provoke problems: ...@@ -140,12 +136,18 @@ provoke problems:
>>> for thread in threads: >>> for thread in threads:
... thread.start() ... thread.start()
>>> for thread in threads: >>> for thread in threads:
... thread.join() ... thread.join(99)
... if thread.isAlive():
... print "Can't join thread."
>>> cache_size('blobs') < 5000 >>> wait_until("size is reduced", check, 99, onfail)
True
.. cleanup .. cleanup
>>> for thread in threading.enumerate():
... if thread not in old_threads:
... thread.join(33)
>>> db.close() >>> db.close()
>>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size >>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment