Commit e45062b9 authored by Jim Fulton's avatar Jim Fulton

Collapsed the Blob package into a single module (and a single test

module).
parent 740cec71
...@@ -40,7 +40,7 @@ import ZODB.lock_file ...@@ -40,7 +40,7 @@ import ZODB.lock_file
from ZODB import POSException from ZODB import POSException
from ZODB import utils from ZODB import utils
from ZODB.loglevels import BLATHER from ZODB.loglevels import BLATHER
from ZODB.Blobs.interfaces import IBlobStorage from ZODB.interfaces import IBlobStorage
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage') logger = logging.getLogger('ZEO.ClientStorage')
...@@ -324,8 +324,8 @@ class ClientStorage(object): ...@@ -324,8 +324,8 @@ class ClientStorage(object):
if blob_dir is not None: if blob_dir is not None:
# Avoid doing this import unless we need it, as it # Avoid doing this import unless we need it, as it
# currently requires pywin32 on Windows. # currently requires pywin32 on Windows.
import ZODB.Blobs.Blob import ZODB.blob
self.fshelper = ZODB.Blobs.Blob.FilesystemHelper(blob_dir) self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
self.fshelper.create() self.fshelper.create()
self.fshelper.checkSecure() self.fshelper.checkSecure()
else: else:
......
...@@ -460,8 +460,7 @@ class CommonBlobTests: ...@@ -460,8 +460,7 @@ class CommonBlobTests:
def checkStoreBlob(self): def checkStoreBlob(self):
from ZODB.utils import oid_repr, tid_repr from ZODB.utils import oid_repr, tid_repr
from ZODB.Blobs.Blob import Blob from ZODB.blob import Blob, BLOB_SUFFIX
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \ from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials handle_serials
import transaction import transaction
...@@ -494,7 +493,7 @@ class CommonBlobTests: ...@@ -494,7 +493,7 @@ class CommonBlobTests:
self.assertEqual(somedata, open(filename).read()) self.assertEqual(somedata, open(filename).read())
def checkLoadBlob(self): def checkLoadBlob(self):
from ZODB.Blobs.Blob import Blob from ZODB.blob import Blob
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \ from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials handle_serials
import transaction import transaction
...@@ -534,8 +533,7 @@ class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests): ...@@ -534,8 +533,7 @@ class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
def checkStoreAndLoadBlob(self): def checkStoreAndLoadBlob(self):
from ZODB.utils import oid_repr, tid_repr from ZODB.utils import oid_repr, tid_repr
from ZODB.Blobs.Blob import Blob from ZODB.blob import Blob, BLOB_SUFFIX
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \ from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials handle_serials
import transaction import transaction
......
##############################################################################
#
# Copyright (c) 2005-2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A ZODB storage that provides blob capabilities.
"""
__docformat__ = "reStructuredText"
import os
import shutil
import base64
import logging
from zope.interface import implements
from zope.proxy import getProxiedObject, non_overridable
from zope.proxy.decorator import SpecificationDecoratorBase
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
from ZODB.POSException import POSKeyError
from ZODB.Blobs.Blob import BLOB_SUFFIX
from ZODB.Blobs.Blob import FilesystemHelper
logger = logging.getLogger('ZODB.BlobStorage')
class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
implements(IBlobStorage)
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
__slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo')
def __new__(self, base_directory, storage):
return SpecificationDecoratorBase.__new__(self, storage)
def __init__(self, base_directory, storage):
# XXX Log warning if storage is ClientStorage
SpecificationDecoratorBase.__init__(self, storage)
self.fshelper = FilesystemHelper(base_directory)
self.fshelper.create()
self.fshelper.checkSecure()
self.dirty_oids = []
try:
supportsUndo = storage.supportsUndo
except AttributeError:
supportsUndo = False
else:
supportsUndo = supportsUndo()
self.__supportsUndo = supportsUndo
@non_overridable
def temporaryDirectory(self):
return self.fshelper.base_dir
@non_overridable
def __repr__(self):
normal_storage = getProxiedObject(self)
return '<BlobStorage proxy for %r at %s>' % (normal_storage,
hex(id(self)))
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
# the user may not have called "open" on the blob object,
# in which case, the blob will not have a filename.
if blobfilename is not None:
self._lock_acquire()
try:
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self.fshelper.getBlobFilename(oid, serial)
os.rename(blobfilename, targetname)
# XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return self._tid
@non_overridable
def tpc_finish(self, *arg, **kw):
# We need to override the base storage's tpc_finish instead of
# providing a _finish method because methods found on the proxied
# object aren't rebound to the proxy
getProxiedObject(self).tpc_finish(*arg, **kw)
self.dirty_oids = []
@non_overridable
def tpc_abort(self, *arg, **kw):
# We need to override the base storage's abort instead of
# providing an _abort method because methods found on the proxied object
# aren't rebound to the proxy
getProxiedObject(self).tpc_abort(*arg, **kw)
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
clean = self.fshelper.getBlobFilename(oid, serial)
if os.exists(clean):
os.unlink(clean)
@non_overridable
def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found.
"""
filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(filename):
return None
return filename
@non_overridable
def _packUndoing(self, packtime, referencesf):
# Walk over all existing revisions of all blob files and check
# if they are still needed by attempting to load the revision
# of that object from the database. This is maybe the slowest
# possible way to do this, but it's safe.
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
files = os.listdir(oid_path)
files.sort()
for filename in files:
filepath = os.path.join(oid_path, filename)
whatever, serial = self.fshelper.splitBlobFilename(filepath)
try:
fn = self.fshelper.getBlobFilename(oid, serial)
self.loadSerial(oid, serial)
except POSKeyError:
os.unlink(filepath)
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
@non_overridable
def _packNonUndoing(self, packtime, referencesf):
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
exists = True
try:
self.load(oid, None) # no version support
except (POSKeyError, KeyError):
exists = False
if exists:
files = os.listdir(oid_path)
files.sort()
latest = files[-1] # depends on ever-increasing tids
files.remove(latest)
for file in files:
os.unlink(os.path.join(oid_path, file))
else:
shutil.rmtree(oid_path)
continue
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
@non_overridable
def pack(self, packtime, referencesf):
"""Remove all unused oid/tid combinations."""
unproxied = getProxiedObject(self)
# pack the underlying storage, which will allow us to determine
# which serials are current.
result = unproxied.pack(packtime, referencesf)
# perform a pack on blob data
self._lock_acquire()
try:
if self.__supportsUndo:
self._packUndoing(packtime, referencesf)
else:
self._packNonUndoing(packtime, referencesf)
finally:
self._lock_release()
return result
@non_overridable
def getSize(self):
"""Return the size of the database in bytes."""
orig_size = getProxiedObject(self).getSize()
blob_size = 0
base_dir = self.fshelper.base_dir
for oid in os.listdir(base_dir):
for serial in os.listdir(os.path.join(base_dir, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
file_path = os.path.join(base_dir, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
@non_overridable
def undo(self, serial_id, transaction):
undo_serial, keys = getProxiedObject(self).undo(serial_id, transaction)
# serial_id is the transaction id of the txn that we wish to undo.
# "undo_serial" is the transaction id of txn in which the undo is
# performed. "keys" is the list of oids that are involved in the
# undo transaction.
# The serial_id is assumed to be given to us base-64 encoded
# (belying the web UI legacy of the ZODB code :-()
serial_id = base64.decodestring(serial_id+'\n')
self._lock_acquire()
try:
# we get all the blob oids on the filesystem related to the
# transaction we want to undo.
for oid in self.fshelper.getOIDsForSerial(serial_id):
# we want to find the serial id of the previous revision
# of this blob object.
load_result = self.loadBefore(oid, serial_id)
if load_result is None:
# There was no previous revision of this blob
# object. The blob was created in the transaction
# represented by serial_id. We copy the blob data
# to a new file that references the undo
# transaction in case a user wishes to undo this
# undo.
orig_fn = self.fshelper.getBlobFilename(oid, serial_id)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
else:
# A previous revision of this blob existed before the
# transaction implied by "serial_id". We copy the blob
# data to a new file that references the undo transaction
# in case a user wishes to undo this undo.
data, serial_before, serial_after = load_result
orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
orig = open(orig_fn, "r")
new = open(new_fn, "wb")
utils.cp(orig, new)
orig.close()
new.close()
self.dirty_oids.append((oid, undo_serial))
finally:
self._lock_release()
return undo_serial, keys
Production
- Ensure we detect and replay a failed txn involving blobs forward or
backward at startup.
Far future
More options for blob directory structures (e.g. dirstorages
bushy/chunky/lawn/flat).
Make the ClientStorage support minimizing the blob cache. (Idea: LRU
principle via mstat access time and a size-based threshold) currently).
Make blobs able to efficiently consume existing files from the filesystem
Savepoint support
=================
- A savepoint represents the whole state of the data at a certain point in
time
- Need special storage for blob savepointing (in the spirit of tmpstorage)
- What belongs to the state of the data?
- Data contained in files at that point in time
- File handles are complex because they might be referred to from various
places. We would have to introduce an abstraction layer to allow
switching them around...
Simpler solution: :
"""The ZODB Blob package."""
Goal: Handle storage and retrieval of binary large objects efficiently,
transactionally, and transparently.
Measure:
- Don't block ZServer on uploads and downloads
- Don't hold BLOBS in memory or cache if not necessary (LRU caches tend
to break if we split BLOBs in lot of small objects. Size-based caches
tend to break on single large objects)
- Transparent for other systems, support normal ZODB operations.
Comments:
- Cache: BLOBs could be cached in a seperate "BLOB" space, e.g. in
single files
- Be storage independent?
- Memory efficiency: Storge.load() currently holds all data of an
object in a string.
Steps:
- simple aspects:
- blobs should be known by zodb
- storages, esp. clientstorage must be able to recognize blobs
- to avoid putting blob data into the client cache.
- blob data mustn't end up in the object cache
- blob object and blob data need to be handled separately
- blob data on client is stored in temporary files
- complicated aspects
- temporary files holding blob data could server as a
separated cache for blob data
- storage / zodb api change
Restrictions:
- a particular BLOB instance can't be open for read _and_ write at
the same time
- Allowed: N readers, no writers; 1 writer, no readers
- Reason:
- a writable filehandle opened via a BLOB's 'open' method has a
lifetime tied to the transaction in which the 'open' method was
called. We do this in order to prevent changes to blob data
from "bleeding over" between transactions.
- Data has been committed? -> File(name) for commited data available
- .open("r") on fresh loaded blob returns committed data
- first .open("w") -> new empty file for uncommitted data
- .open("a") or .open("r+"), we copy existing data into file for
uncommitted data
- if uncommitted data exists, subsequent .open("*") will use the
uncommitted data
- if opened for writing, the object is marked as changed
(optimiziation possible)
- connections want to recognize blobs on transaction boundaries
class BlobError(Exception):
pass
##############################################################################
#
# Copyright (c) 2005-2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Blob-related interfaces
"""
from zope.interface import Interface
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
def open(mode):
"""Returns a file(-like) object for handling the blob data.
mode: Mode to open the file with. Possible values: r,w,r+,a
"""
def openDetached(class_=file):
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
The file handle returned by this method is read-only and at the
beginning of the file.
The handle is not attached to the blob and can be used outside of a
transaction.
Optionally the class that should be used to open the file can be
specified. This can be used to e.g. use Zope's FileStreamIterator.
"""
def consumeFile(filename):
"""Will replace the current data of the blob with the file given under
filename.
This method uses link-like semantics internally and has the requirement
that the file that is to be consumed lives on the same volume (or
mount/share) as the blob directory.
The blob must not be opened for reading or writing when consuming a
file.
"""
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial):
"""Return the filename of the Blob data for this OID and serial.
Returns a filename or None if no Blob data is connected with this OID.
Raises POSKeyError if the blobfile cannot be found.
"""
def temporaryDirectory():
"""Return a directory that should be used for uncommitted blob data.
If Blobs use this, then commits can be performed with a simple rename.
"""
##############################################################################
#
# Copyright (c) 2004-2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import tempfile, shutil, unittest
import os
from ZODB.tests.testConfig import ConfigTestBase
from ZConfig import ConfigurationSyntaxError
class BlobConfigTestBase(ConfigTestBase):
def setUp(self):
super(BlobConfigTestBase, self).setUp()
self.blob_dir = tempfile.mkdtemp()
def tearDown(self):
super(BlobConfigTestBase, self).tearDown()
shutil.rmtree(self.blob_dir)
class ZODBBlobConfigTest(BlobConfigTestBase):
def test_map_config1(self):
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<mappingstorage/>
</blobstorage>
</zodb>
""" % self.blob_dir)
def test_file_config1(self):
path = tempfile.mktemp()
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<filestorage>
path %s
</filestorage>
</blobstorage>
</zodb>
""" %(self.blob_dir, path))
os.unlink(path)
os.unlink(path+".index")
os.unlink(path+".tmp")
def test_blob_dir_needed(self):
self.assertRaises(ConfigurationSyntaxError,
self._test,
"""
<zodb>
<blobstorage>
<mappingstorage/>
</blobstorage>
</zodb>
""")
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZODBBlobConfigTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest = 'test_suite')
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing import doctest
import ZODB.tests.util
def test_suite():
return doctest.DocFileSuite(
"basic.txt", "connection.txt", "transaction.txt",
"packing.txt", "importexport.txt", "consume.txt",
setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown,
)
...@@ -29,7 +29,7 @@ from persistent import PickleCache ...@@ -29,7 +29,7 @@ from persistent import PickleCache
# interfaces # interfaces
from persistent.interfaces import IPersistentDataManager from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection from ZODB.interfaces import IConnection
from ZODB.Blobs.interfaces import IBlob, IBlobStorage from ZODB.interfaces import IBlob, IBlobStorage
from transaction.interfaces import ISavepointDataManager from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer from transaction.interfaces import ISynchronizer
......
...@@ -22,7 +22,7 @@ import logging ...@@ -22,7 +22,7 @@ import logging
from ZODB.POSException import ExportError, POSKeyError from ZODB.POSException import ExportError, POSKeyError
from ZODB.utils import p64, u64, cp, mktemp from ZODB.utils import p64, u64, cp, mktemp
from ZODB.Blobs.interfaces import IBlobStorage from ZODB.interfaces import IBlobStorage
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
logger = logging.getLogger('ZODB.ExportImport') logger = logging.getLogger('ZODB.ExportImport')
......
...@@ -11,12 +11,13 @@ ...@@ -11,12 +11,13 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""The blob class and related utilities. """Blobs
""" """
__docformat__ = "reStructuredText"
import base64
import logging
import os import os
import shutil
import sys import sys
import time import time
import tempfile import tempfile
...@@ -24,21 +25,27 @@ import logging ...@@ -24,21 +25,27 @@ import logging
import zope.interface import zope.interface
from ZODB.Blobs.interfaces import IBlob import ZODB.interfaces
from ZODB.Blobs.exceptions import BlobError from ZODB.interfaces import BlobError
from ZODB import utils from ZODB import utils
from ZODB.POSException import POSKeyError
import transaction import transaction
import transaction.interfaces import transaction.interfaces
from persistent import Persistent import persistent
from zope.proxy import getProxiedObject, non_overridable
from zope.proxy.decorator import SpecificationDecoratorBase
logger = logging.getLogger('ZODB.blob')
BLOB_SUFFIX = ".blob" BLOB_SUFFIX = ".blob"
valid_modes = 'r', 'w', 'r+', 'a' valid_modes = 'r', 'w', 'r+', 'a'
class Blob(Persistent): class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB.""" """A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(IBlob) zope.interface.implements(ZODB.interfaces.IBlob)
_os_link = os.rename _os_link = os.rename
...@@ -372,11 +379,8 @@ class BlobFile(file): ...@@ -372,11 +379,8 @@ class BlobFile(file):
# muddying the code needlessly. # muddying the code needlessly.
self.close() self.close()
logger = logging.getLogger('ZODB.Blobs')
_pid = str(os.getpid()) _pid = str(os.getpid())
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False): def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
message = "(%s) %s" % (subsys, msg) message = "(%s) %s" % (subsys, msg)
logger.log(level, message, exc_info=exc_info) logger.log(level, message, exc_info=exc_info)
...@@ -471,3 +475,279 @@ class FilesystemHelper: ...@@ -471,3 +475,279 @@ class FilesystemHelper:
if search_serial == serial: if search_serial == serial:
oids.append(oid) oids.append(oid)
return oids return oids
class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
zope.interface.implements(ZODB.interfaces.IBlobStorage)
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
__slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo')
def __new__(self, base_directory, storage):
return SpecificationDecoratorBase.__new__(self, storage)
def __init__(self, base_directory, storage):
# XXX Log warning if storage is ClientStorage
SpecificationDecoratorBase.__init__(self, storage)
self.fshelper = FilesystemHelper(base_directory)
self.fshelper.create()
self.fshelper.checkSecure()
self.dirty_oids = []
try:
supportsUndo = storage.supportsUndo
except AttributeError:
supportsUndo = False
else:
supportsUndo = supportsUndo()
self.__supportsUndo = supportsUndo
@non_overridable
def temporaryDirectory(self):
return self.fshelper.base_dir
@non_overridable
def __repr__(self):
normal_storage = getProxiedObject(self)
return '<BlobStorage proxy for %r at %s>' % (normal_storage,
hex(id(self)))
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
# the user may not have called "open" on the blob object,
# in which case, the blob will not have a filename.
if blobfilename is not None:
self._lock_acquire()
try:
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self.fshelper.getBlobFilename(oid, serial)
os.rename(blobfilename, targetname)
# XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return self._tid
@non_overridable
def tpc_finish(self, *arg, **kw):
# We need to override the base storage's tpc_finish instead of
# providing a _finish method because methods found on the proxied
# object aren't rebound to the proxy
getProxiedObject(self).tpc_finish(*arg, **kw)
self.dirty_oids = []
@non_overridable
def tpc_abort(self, *arg, **kw):
# We need to override the base storage's abort instead of
# providing an _abort method because methods found on the proxied object
# aren't rebound to the proxy
getProxiedObject(self).tpc_abort(*arg, **kw)
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
clean = self.fshelper.getBlobFilename(oid, serial)
if os.exists(clean):
os.unlink(clean)
@non_overridable
def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found.
"""
filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(filename):
return None
return filename
@non_overridable
def _packUndoing(self, packtime, referencesf):
# Walk over all existing revisions of all blob files and check
# if they are still needed by attempting to load the revision
# of that object from the database. This is maybe the slowest
# possible way to do this, but it's safe.
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
files = os.listdir(oid_path)
files.sort()
for filename in files:
filepath = os.path.join(oid_path, filename)
whatever, serial = self.fshelper.splitBlobFilename(filepath)
try:
fn = self.fshelper.getBlobFilename(oid, serial)
self.loadSerial(oid, serial)
except POSKeyError:
os.unlink(filepath)
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
@non_overridable
def _packNonUndoing(self, packtime, referencesf):
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
exists = True
try:
self.load(oid, None) # no version support
except (POSKeyError, KeyError):
exists = False
if exists:
files = os.listdir(oid_path)
files.sort()
latest = files[-1] # depends on ever-increasing tids
files.remove(latest)
for file in files:
os.unlink(os.path.join(oid_path, file))
else:
shutil.rmtree(oid_path)
continue
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
@non_overridable
def pack(self, packtime, referencesf):
"""Remove all unused oid/tid combinations."""
unproxied = getProxiedObject(self)
# pack the underlying storage, which will allow us to determine
# which serials are current.
result = unproxied.pack(packtime, referencesf)
# perform a pack on blob data
self._lock_acquire()
try:
if self.__supportsUndo:
self._packUndoing(packtime, referencesf)
else:
self._packNonUndoing(packtime, referencesf)
finally:
self._lock_release()
return result
@non_overridable
def getSize(self):
"""Return the size of the database in bytes."""
orig_size = getProxiedObject(self).getSize()
blob_size = 0
base_dir = self.fshelper.base_dir
for oid in os.listdir(base_dir):
for serial in os.listdir(os.path.join(base_dir, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
file_path = os.path.join(base_dir, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
@non_overridable
def undo(self, serial_id, transaction):
undo_serial, keys = getProxiedObject(self).undo(serial_id, transaction)
# serial_id is the transaction id of the txn that we wish to undo.
# "undo_serial" is the transaction id of txn in which the undo is
# performed. "keys" is the list of oids that are involved in the
# undo transaction.
# The serial_id is assumed to be given to us base-64 encoded
# (belying the web UI legacy of the ZODB code :-()
serial_id = base64.decodestring(serial_id+'\n')
self._lock_acquire()
try:
# we get all the blob oids on the filesystem related to the
# transaction we want to undo.
for oid in self.fshelper.getOIDsForSerial(serial_id):
# we want to find the serial id of the previous revision
# of this blob object.
load_result = self.loadBefore(oid, serial_id)
if load_result is None:
# There was no previous revision of this blob
# object. The blob was created in the transaction
# represented by serial_id. We copy the blob data
# to a new file that references the undo
# transaction in case a user wishes to undo this
# undo.
orig_fn = self.fshelper.getBlobFilename(oid, serial_id)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
else:
# A previous revision of this blob existed before the
# transaction implied by "serial_id". We copy the blob
# data to a new file that references the undo transaction
# in case a user wishes to undo this undo.
data, serial_before, serial_after = load_result
orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
orig = open(orig_fn, "r")
new = open(new_fn, "wb")
utils.cp(orig, new)
orig.close()
new.close()
self.dirty_oids.append((oid, undo_serial))
finally:
self._lock_release()
return undo_serial, keys
# To do:
#
# Production
#
# - Ensure we detect and replay a failed txn involving blobs forward or
# backward at startup.
#
# Jim: What does this mean?
#
# Far future
#
# More options for blob directory structures (e.g. dirstorages
# bushy/chunky/lawn/flat).
#
# Make the ClientStorage support minimizing the blob
# cache. (Idea: LRU principle via mstat access time and a
# size-based threshold) currently).
#
# Make blobs able to efficiently consume existing files from the
# filesystem
#
# Savepoint support
# =================
#
# - A savepoint represents the whole state of the data at a certain point in
# time
#
# - Need special storage for blob savepointing (in the spirit of tmpstorage)
#
# - What belongs to the state of the data?
#
# - Data contained in files at that point in time
#
# - File handles are complex because they might be referred to from various
# places. We would have to introduce an abstraction layer to allow
# switching them around...
#
# Simpler solution: :
...@@ -137,7 +137,7 @@ class FileStorage(BaseConfig): ...@@ -137,7 +137,7 @@ class FileStorage(BaseConfig):
class BlobStorage(BaseConfig): class BlobStorage(BaseConfig):
def open(self): def open(self):
from ZODB.Blobs.BlobStorage import BlobStorage from ZODB.blob import BlobStorage
base = self.config.base.open() base = self.config.base.open()
return BlobStorage(self.config.blob_dir, base) return BlobStorage(self.config.blob_dir, base)
......
...@@ -897,3 +897,62 @@ class IStorageCurrentRecordIteration(IStorage): ...@@ -897,3 +897,62 @@ class IStorageCurrentRecordIteration(IStorage):
... break ... break
""" """
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
def open(mode):
"""Returns a file(-like) object for handling the blob data.
mode: Mode to open the file with. Possible values: r,w,r+,a
"""
def openDetached(class_=file):
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
The file handle returned by this method is read-only and at the
beginning of the file.
The handle is not attached to the blob and can be used outside of a
transaction.
Optionally the class that should be used to open the file can be
specified. This can be used to e.g. use Zope's FileStreamIterator.
"""
def consumeFile(filename):
"""Will replace the current data of the blob with the file given under
filename.
This method uses link-like semantics internally and has the requirement
that the file that is to be consumed lives on the same volume (or
mount/share) as the blob directory.
The blob must not be opened for reading or writing when consuming a
file.
"""
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial):
"""Return the filename of the Blob data for this OID and serial.
Returns a filename or None if no Blob data is connected with this OID.
Raises POSKeyError if the blobfile cannot be found.
"""
def temporaryDirectory():
"""Return a directory that should be used for uncommitted blob data.
If Blobs use this, then commits can be performed with a simple rename.
"""
class BlobError(Exception):
pass
...@@ -17,12 +17,12 @@ ZODB Blob support ...@@ -17,12 +17,12 @@ ZODB Blob support
You create a blob like this: You create a blob like this:
>>> from ZODB.Blobs.Blob import Blob >>> from ZODB.blob import Blob
>>> myblob = Blob() >>> myblob = Blob()
A blob implements the IBlob interface: A blob implements the IBlob interface:
>>> from ZODB.Blobs.interfaces import IBlob >>> from ZODB.interfaces import IBlob
>>> IBlob.providedBy(myblob) >>> IBlob.providedBy(myblob)
True True
......
...@@ -17,8 +17,8 @@ Connection support for Blobs tests ...@@ -17,8 +17,8 @@ Connection support for Blobs tests
Connections handle Blobs specially. To demonstrate that, we first need a Blob with some data: Connections handle Blobs specially. To demonstrate that, we first need a Blob with some data:
>>> from ZODB.Blobs.interfaces import IBlob >>> from ZODB.interfaces import IBlob
>>> from ZODB.Blobs.Blob import Blob >>> from ZODB.blob import Blob
>>> import transaction >>> import transaction
>>> blob = Blob() >>> blob = Blob()
>>> data = blob.open("w") >>> data = blob.open("w")
...@@ -28,7 +28,7 @@ Connections handle Blobs specially. To demonstrate that, we first need a Blob wi ...@@ -28,7 +28,7 @@ Connections handle Blobs specially. To demonstrate that, we first need a Blob wi
We also need a database with a blob supporting storage: We also need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage >>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage >>> from ZODB.blob import BlobStorage
>>> from ZODB.DB import DB >>> from ZODB.DB import DB
>>> from tempfile import mkdtemp >>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test") >>> base_storage = MappingStorage("test")
......
...@@ -15,7 +15,7 @@ The file *must* be closed before giving it to consumeFile: ...@@ -15,7 +15,7 @@ The file *must* be closed before giving it to consumeFile:
Now, let's consume this file in a blob by specifying it's name:: Now, let's consume this file in a blob by specifying it's name::
>>> from ZODB.Blobs.Blob import Blob >>> from ZODB.blob import Blob
>>> blob = Blob() >>> blob = Blob()
>>> blob.consumeFile('to_import') >>> blob.consumeFile('to_import')
......
...@@ -18,8 +18,7 @@ Import/export support for blob data ...@@ -18,8 +18,7 @@ Import/export support for blob data
Set up: Set up:
>>> from ZODB.FileStorage import FileStorage >>> from ZODB.FileStorage import FileStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage >>> from ZODB.blob import Blob, BlobStorage
>>> from ZODB.Blobs.Blob import Blob
>>> from ZODB.DB import DB >>> from ZODB.DB import DB
>>> from persistent.mapping import PersistentMapping >>> from persistent.mapping import PersistentMapping
>>> import shutil >>> import shutil
......
...@@ -20,8 +20,7 @@ Set up: ...@@ -20,8 +20,7 @@ Set up:
>>> from ZODB.FileStorage import FileStorage >>> from ZODB.FileStorage import FileStorage
>>> from ZODB.MappingStorage import MappingStorage >>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.serialize import referencesf >>> from ZODB.serialize import referencesf
>>> from ZODB.Blobs.BlobStorage import BlobStorage >>> from ZODB.blob import Blob, BlobStorage
>>> from ZODB.Blobs.Blob import Blob
>>> from ZODB import utils >>> from ZODB import utils
>>> from ZODB.DB import DB >>> from ZODB.DB import DB
>>> import shutil >>> import shutil
......
...@@ -18,7 +18,7 @@ Transaction support for Blobs ...@@ -18,7 +18,7 @@ Transaction support for Blobs
We need a database with a blob supporting storage:: We need a database with a blob supporting storage::
>>> from ZODB.MappingStorage import MappingStorage >>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage >>> from ZODB.blob import Blob, BlobStorage
>>> from ZODB.DB import DB >>> from ZODB.DB import DB
>>> import transaction >>> import transaction
>>> import tempfile >>> import tempfile
...@@ -29,7 +29,6 @@ We need a database with a blob supporting storage:: ...@@ -29,7 +29,6 @@ We need a database with a blob supporting storage::
>>> database = DB(blob_storage) >>> database = DB(blob_storage)
>>> connection1 = database.open() >>> connection1 = database.open()
>>> root1 = connection1.root() >>> root1 = connection1.root()
>>> from ZODB.Blobs.Blob import Blob
Putting a Blob into a Connection works like any other Persistent object:: Putting a Blob into a Connection works like any other Persistent object::
...@@ -176,7 +175,7 @@ connections should result in a write conflict error:: ...@@ -176,7 +175,7 @@ connections should result in a write conflict error::
>>> tm2.get().commit() >>> tm2.get().commit()
Traceback (most recent call last): Traceback (most recent call last):
... ...
ConflictError: database conflict error (oid 0x01, class ZODB.Blobs.Blob.Blob) ConflictError: database conflict error (oid 0x01, class ZODB.blob.Blob)
After the conflict, the winning transaction's result is visible on both After the conflict, the winning transaction's result is visible on both
connections:: connections::
...@@ -233,7 +232,7 @@ We do not support non-optimistic savepoints:: ...@@ -233,7 +232,7 @@ We do not support non-optimistic savepoints::
>>> savepoint = transaction.savepoint() # doctest: +ELLIPSIS >>> savepoint = transaction.savepoint() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
TypeError: ('Savepoints unsupported', <ZODB.Blobs.Blob.BlobDataManager instance at 0x...>) TypeError: ('Savepoints unsupported', <ZODB.blob.BlobDataManager instance at 0x...>)
>>> transaction.abort() >>> transaction.abort()
Reading Blobs outside of a transaction Reading Blobs outside of a transaction
......
...@@ -11,19 +11,74 @@ ...@@ -11,19 +11,74 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
import unittest
import tempfile
import os
import shutil
import base64
import base64, os, shutil, tempfile, unittest
from zope.testing import doctest
import ZODB.tests.util
from ZODB import utils
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.Blobs.BlobStorage import BlobStorage from ZODB.blob import Blob, BlobStorage
from ZODB.Blobs.Blob import Blob
from ZODB.DB import DB from ZODB.DB import DB
import transaction import transaction
from ZODB.Blobs.Blob import Blob
from ZODB import utils from ZODB.tests.testConfig import ConfigTestBase
from ZConfig import ConfigurationSyntaxError
class BlobConfigTestBase(ConfigTestBase):
def setUp(self):
super(BlobConfigTestBase, self).setUp()
self.blob_dir = tempfile.mkdtemp()
def tearDown(self):
super(BlobConfigTestBase, self).tearDown()
shutil.rmtree(self.blob_dir)
class ZODBBlobConfigTest(BlobConfigTestBase):
def test_map_config1(self):
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<mappingstorage/>
</blobstorage>
</zodb>
""" % self.blob_dir)
def test_file_config1(self):
path = tempfile.mktemp()
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<filestorage>
path %s
</filestorage>
</blobstorage>
</zodb>
""" %(self.blob_dir, path))
os.unlink(path)
os.unlink(path+".index")
os.unlink(path+".tmp")
def test_blob_dir_needed(self):
self.assertRaises(ConfigurationSyntaxError,
self._test,
"""
<zodb>
<blobstorage>
<mappingstorage/>
</blobstorage>
</zodb>
""")
class BlobUndoTests(unittest.TestCase): class BlobUndoTests(unittest.TestCase):
...@@ -212,9 +267,19 @@ class BlobUndoTests(unittest.TestCase): ...@@ -212,9 +267,19 @@ class BlobUndoTests(unittest.TestCase):
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZODBBlobConfigTest))
suite.addTest(doctest.DocFileSuite(
"blob_basic.txt", "blob_connection.txt", "blob_transaction.txt",
"blob_packing.txt", "blob_importexport.txt", "blob_consume.txt",
setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown,
))
suite.addTest(unittest.makeSuite(BlobUndoTests)) suite.addTest(unittest.makeSuite(BlobUndoTests))
return suite return suite
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(defaultTest = 'test_suite') unittest.main(defaultTest = 'test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment