############################################################################## # # Copyright (c) 2005-2006 Zope Corporation and Contributors. # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE # ############################################################################## """Blobs """ import base64 import logging import os import shutil import sys import time import tempfile import logging import zope.interface import ZODB.interfaces from ZODB.interfaces import BlobError from ZODB import utils from ZODB.POSException import POSKeyError import transaction import transaction.interfaces import persistent from zope.proxy import getProxiedObject, non_overridable from zope.proxy.decorator import SpecificationDecoratorBase logger = logging.getLogger('ZODB.blob') BLOB_SUFFIX = ".blob" valid_modes = 'r', 'w', 'r+', 'a' class Blob(persistent.Persistent): """A BLOB supports efficient handling of large data within ZODB.""" zope.interface.implements(ZODB.interfaces.IBlob) _os_link = os.rename _p_blob_readers = 0 _p_blob_writers = 0 _p_blob_uncommitted = None # Filename of the uncommitted (dirty) data _p_blob_data = None # Filename of the committed data # All persistent object store a reference to their data manager, a database # connection in the _p_jar attribute. So we are going to do the same with # blobs here. _p_blob_manager = None # Blobs need to participate in transactions even when not connected to # a database yet. If you want to use a non-default transaction manager, # you can override it via _p_blob_transaction. This is currently # required for unit testing. _p_blob_transaction = None def open(self, mode="r"): """Returns a file(-like) object representing blob data.""" result = None if mode not in valid_modes: raise ValueError("invalid mode", mode) if mode == 'r': if self._current_filename() is None: raise BlobError("Blob does not exist.") if self._p_blob_writers != 0: raise BlobError("Already opened for writing.") self._p_blob_readers += 1 result = BlobFile(self._current_filename(), mode, self) elif mode == 'w': if self._p_blob_readers != 0: raise BlobError("Already opened for reading.") self._p_blob_writers += 1 if self._p_blob_uncommitted is None: self._create_uncommitted_file() result = BlobFile(self._p_blob_uncommitted, mode, self) elif mode in ('a', 'r+'): if self._p_blob_readers != 0: raise BlobError("Already opened for reading.") if self._p_blob_uncommitted is None: # Create a new working copy uncommitted = BlobFile(self._create_uncommitted_file(), mode, self) # NOTE: _p_blob data appears by virtue of Connection._setstate utils.cp(file(self._p_blob_data), uncommitted) uncommitted.seek(0) else: # Re-use existing working copy uncommitted = BlobFile(self._p_blob_uncommitted, mode, self) self._p_blob_writers += 1 result = uncommitted else: raise IOError('invalid mode: %s ' % mode) if result is not None: self._setup_transaction_manager(result) return result def openDetached(self, class_=file): """Returns a file(-like) object in read mode that can be used outside of transaction boundaries. """ if self._current_filename() is None: raise BlobError("Blob does not exist.") if self._p_blob_writers != 0: raise BlobError("Already opened for writing.") # XXX this should increase the reader number and have a test !?! return class_(self._current_filename(), "rb") def consumeFile(self, filename): """Will replace the current data of the blob with the file given under filename. """ if self._p_blob_writers != 0: raise BlobError("Already opened for writing.") if self._p_blob_readers != 0: raise BlobError("Already opened for reading.") previous_uncommitted = bool(self._p_blob_uncommitted) if previous_uncommitted: # If we have uncommitted data, we move it aside for now # in case the consumption doesn't work. target = self._p_blob_uncommitted target_aside = target+".aside" os.rename(target, target_aside) else: target = self._create_uncommitted_file() # We need to unlink the freshly created target again # to allow link() to do its job os.unlink(target) try: self._os_link(filename, target) except: # Recover from the failed consumption: First remove the file, it # might exist and mark the pointer to the uncommitted file. self._p_blob_uncommitted = None if os.path.exists(target): os.unlink(target) # If there was a file moved aside, bring it back including the # pointer to the uncommitted file. if previous_uncommitted: os.rename(target_aside, target) self._p_blob_uncommitted = target # Re-raise the exception to make the application aware of it. raise else: if previous_uncommitted: # The relinking worked so we can remove the data that we had # set aside. os.unlink(target_aside) # We changed the blob state and have to make sure we join the # transaction. self._change() # utility methods def _current_filename(self): # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of # Connection._setstate return self._p_blob_uncommitted or self._p_blob_data def _create_uncommitted_file(self): assert self._p_blob_uncommitted is None, ( "Uncommitted file already exists.") tempdir = os.environ.get('ZODB_BLOB_TEMPDIR', tempfile.gettempdir()) self._p_blob_uncommitted = utils.mktemp(dir=tempdir) return self._p_blob_uncommitted def _change(self): self._p_changed = 1 def _setup_transaction_manager(self, result): # We join the transaction with our own data manager in order to be # notified of commit/vote/abort events. We do this because at # transaction boundaries, we need to fix up _p_ reference counts # that keep track of open readers and writers and close any # writable filehandles we've opened. if self._p_blob_manager is None: # Blobs need to always participate in transactions. if self._p_jar is not None: # If we are connected to a database, then we use the # transaction manager that belongs to this connection tm = self._p_jar.transaction_manager else: # If we are not connected to a database, we check whether # we have been given an explicit transaction manager if self._p_blob_transaction: tm = self._p_blob_transaction else: # Otherwise we use the default # transaction manager as an educated guess. tm = transaction.manager # Create our datamanager and join he current transaction. dm = BlobDataManager(self, result, tm) tm.get().join(dm) elif result: # Each blob data manager should manage only the one blob # assigned to it. Assert that this is the case and it is the # correct blob assert self._p_blob_manager.blob is self self._p_blob_manager.register_fh(result) # utility methods which should not cause the object's state to be # loaded if they are called while the object is a ghost. Thus, # they are named with the _p_ convention and only operate against # other _p_ instance attributes. We conventionally name these methods # and attributes with a _p_blob prefix. def _p_blob_clear(self): self._p_blob_readers = 0 self._p_blob_writers = 0 def _p_blob_decref(self, mode): if mode == 'r': self._p_blob_readers = max(0, self._p_blob_readers - 1) else: assert mode in valid_modes, "Invalid mode %r" % mode self._p_blob_writers = max(0, self._p_blob_writers - 1) def _p_blob_refcounts(self): # used by unit tests return self._p_blob_readers, self._p_blob_writers class BlobDataManager: """Special data manager to handle transaction boundaries for blobs. Blobs need some special care-taking on transaction boundaries. As a) the ghost objects might get reused, the _p_reader and _p_writer refcount attributes must be set to a consistent state b) the file objects might get passed out of the thread/transaction and must deny any relationship to the original blob. c) writable blob filehandles must be closed at the end of a txn so as to not allow reuse between two transactions. """ zope.interface.implements(transaction.interfaces.IDataManager) def __init__(self, blob, filehandle, tm): self.blob = blob self.transaction = tm.get() # we keep a weakref to the file handle because we don't want to # keep it alive if all other references to it die (e.g. in the # case it's opened without assigning it to a name). self.fhrefs = utils.WeakSet() self.register_fh(filehandle) self.sortkey = time.time() self.prepared = False # Blob specific methods def register_fh(self, filehandle): self.fhrefs.add(filehandle) def _remove_uncommitted_data(self): self.blob._p_blob_clear() self.fhrefs.map(lambda fhref: fhref.close()) if (self.blob._p_blob_uncommitted is not None and os.path.exists(self.blob._p_blob_uncommitted)): os.unlink(self.blob._p_blob_uncommitted) self.blob._p_blob_uncommitted = None # IDataManager def tpc_begin(self, transaction): if self.prepared: raise TypeError('Already prepared') self._checkTransaction(transaction) self.prepared = True self.transaction = transaction self.fhrefs.map(lambda fhref: fhref.close()) def commit(self, transaction): if not self.prepared: raise TypeError('Not prepared to commit') self._checkTransaction(transaction) self.transaction = None self.prepared = False self.blob._p_blob_clear() def abort(self, transaction): self.tpc_abort(transaction) def tpc_abort(self, transaction): self._checkTransaction(transaction) if self.transaction is not None: self.transaction = None self.prepared = False self._remove_uncommitted_data() def tpc_finish(self, transaction): pass def tpc_vote(self, transaction): pass def sortKey(self): return self.sortkey def _checkTransaction(self, transaction): if (self.transaction is not None and self.transaction is not transaction): raise TypeError("Transaction missmatch", transaction, self.transaction) class BlobFile(file): """A BlobFile that holds a file handle to actual blob data. It is a file that can be used within a transaction boundary; a BlobFile is just a Python file object, we only override methods which cause a change to blob data in order to call methods on our 'parent' persistent blob object signifying that the change happened. """ # XXX these files should be created in the same partition as # the storage later puts them to avoid copying them ... def __init__(self, name, mode, blob): super(BlobFile, self).__init__(name, mode+'b') self.blob = blob self.close_called = False def write(self, data): super(BlobFile, self).write(data) self.blob._change() def writelines(self, lines): super(BlobFile, self).writelines(lines) self.blob._change() def truncate(self, size=0): super(BlobFile, self).truncate(size) self.blob._change() def close(self): # we don't want to decref twice if not self.close_called: self.blob._p_blob_decref(self.mode[:-1]) self.close_called = True super(BlobFile, self).close() def __del__(self): # XXX we need to ensure that the file is closed at object # expiration or our blob's refcount won't be decremented. # This probably needs some work; I don't know if the names # 'BlobFile' or 'super' will be available at program exit, but # we'll assume they will be for now in the name of not # muddying the code needlessly. self.close() _pid = str(os.getpid()) def log(msg, level=logging.INFO, subsys=_pid, exc_info=False): message = "(%s) %s" % (subsys, msg) logger.log(level, message, exc_info=exc_info) class FilesystemHelper: # Storages that implement IBlobStorage can choose to use this # helper class to generate and parse blob filenames. This is not # a set-in-stone interface for all filesystem operations dealing # with blobs and storages needn't indirect through this if they # want to perform blob storage differently. def __init__(self, base_dir): self.base_dir = base_dir def create(self): if not os.path.exists(self.base_dir): os.makedirs(self.base_dir, 0700) log("Blob cache directory '%s' does not exist. " "Created new directory." % self.base_dir, level=logging.INFO) def isSecure(self, path): """Ensure that (POSIX) path mode bits are 0700.""" return (os.stat(path).st_mode & 077) != 0 def checkSecure(self): if not self.isSecure(self.base_dir): log('Blob dir %s has insecure mode setting' % self.base_dir, level=logging.WARNING) def getPathForOID(self, oid): """Given an OID, return the path on the filesystem where the blob data relating to that OID is stored. """ return os.path.join(self.base_dir, utils.oid_repr(oid)) def getBlobFilename(self, oid, tid): """Given an oid and a tid, return the full filename of the 'committed' blob file related to that oid and tid. """ oid_path = self.getPathForOID(oid) filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX) return os.path.join(oid_path, filename) def blob_mkstemp(self, oid, tid): """Given an oid and a tid, return a temporary file descriptor and a related filename. The file is guaranteed to exist on the same partition as committed data, which is important for being able to rename the file without a copy operation. The directory in which the file will be placed, which is the return value of self.getPathForOID(oid), must exist before this method may be called successfully. """ oidpath = self.getPathForOID(oid) fd, name = tempfile.mkstemp(suffix='.tmp', prefix=utils.tid_repr(tid), dir=oidpath) return fd, name def splitBlobFilename(self, filename): """Returns the oid and tid for a given blob filename. If the filename cannot be recognized as a blob filename, (None, None) is returned. """ if not filename.endswith(BLOB_SUFFIX): return None, None path, filename = os.path.split(filename) oid = os.path.split(path)[1] serial = filename[:-len(BLOB_SUFFIX)] oid = utils.repr_to_oid(oid) serial = utils.repr_to_oid(serial) return oid, serial def getOIDsForSerial(self, search_serial): """Return all oids related to a particular tid that exist in blob data. """ oids = [] base_dir = self.base_dir for oidpath in os.listdir(base_dir): for filename in os.listdir(os.path.join(base_dir, oidpath)): blob_path = os.path.join(base_dir, oidpath, filename) oid, serial = self.splitBlobFilename(blob_path) if search_serial == serial: oids.append(oid) return oids class BlobStorage(SpecificationDecoratorBase): """A storage to support blobs.""" zope.interface.implements(ZODB.interfaces.IBlobStorage) # Proxies can't have a __dict__ so specifying __slots__ here allows # us to have instance attributes explicitly on the proxy. __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo') def __new__(self, base_directory, storage): return SpecificationDecoratorBase.__new__(self, storage) def __init__(self, base_directory, storage): # XXX Log warning if storage is ClientStorage SpecificationDecoratorBase.__init__(self, storage) self.fshelper = FilesystemHelper(base_directory) self.fshelper.create() self.fshelper.checkSecure() self.dirty_oids = [] try: supportsUndo = storage.supportsUndo except AttributeError: supportsUndo = False else: supportsUndo = supportsUndo() self.__supportsUndo = supportsUndo @non_overridable def temporaryDirectory(self): return self.fshelper.base_dir @non_overridable def __repr__(self): normal_storage = getProxiedObject(self) return '<BlobStorage proxy for %r at %s>' % (normal_storage, hex(id(self))) @non_overridable def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction): """Stores data that has a BLOB attached.""" serial = self.store(oid, oldserial, data, version, transaction) assert isinstance(serial, str) # XXX in theory serials could be # something else # the user may not have called "open" on the blob object, # in which case, the blob will not have a filename. if blobfilename is not None: self._lock_acquire() try: targetpath = self.fshelper.getPathForOID(oid) if not os.path.exists(targetpath): os.makedirs(targetpath, 0700) targetname = self.fshelper.getBlobFilename(oid, serial) os.rename(blobfilename, targetname) # XXX if oid already in there, something is really hosed. # The underlying storage should have complained anyway self.dirty_oids.append((oid, serial)) finally: self._lock_release() return self._tid @non_overridable def tpc_finish(self, *arg, **kw): # We need to override the base storage's tpc_finish instead of # providing a _finish method because methods found on the proxied # object aren't rebound to the proxy getProxiedObject(self).tpc_finish(*arg, **kw) self.dirty_oids = [] @non_overridable def tpc_abort(self, *arg, **kw): # We need to override the base storage's abort instead of # providing an _abort method because methods found on the proxied object # aren't rebound to the proxy getProxiedObject(self).tpc_abort(*arg, **kw) while self.dirty_oids: oid, serial = self.dirty_oids.pop() clean = self.fshelper.getBlobFilename(oid, serial) if os.exists(clean): os.unlink(clean) @non_overridable def loadBlob(self, oid, serial): """Return the filename where the blob file can be found. """ filename = self.fshelper.getBlobFilename(oid, serial) if not os.path.exists(filename): return None return filename @non_overridable def _packUndoing(self, packtime, referencesf): # Walk over all existing revisions of all blob files and check # if they are still needed by attempting to load the revision # of that object from the database. This is maybe the slowest # possible way to do this, but it's safe. # XXX we should be tolerant of "garbage" directories/files in # the base_directory here. base_dir = self.fshelper.base_dir for oid_repr in os.listdir(base_dir): oid = utils.repr_to_oid(oid_repr) oid_path = os.path.join(base_dir, oid_repr) files = os.listdir(oid_path) files.sort() for filename in files: filepath = os.path.join(oid_path, filename) whatever, serial = self.fshelper.splitBlobFilename(filepath) try: fn = self.fshelper.getBlobFilename(oid, serial) self.loadSerial(oid, serial) except POSKeyError: os.unlink(filepath) if not os.listdir(oid_path): shutil.rmtree(oid_path) @non_overridable def _packNonUndoing(self, packtime, referencesf): base_dir = self.fshelper.base_dir for oid_repr in os.listdir(base_dir): oid = utils.repr_to_oid(oid_repr) oid_path = os.path.join(base_dir, oid_repr) exists = True try: self.load(oid, None) # no version support except (POSKeyError, KeyError): exists = False if exists: files = os.listdir(oid_path) files.sort() latest = files[-1] # depends on ever-increasing tids files.remove(latest) for file in files: os.unlink(os.path.join(oid_path, file)) else: shutil.rmtree(oid_path) continue if not os.listdir(oid_path): shutil.rmtree(oid_path) @non_overridable def pack(self, packtime, referencesf): """Remove all unused oid/tid combinations.""" unproxied = getProxiedObject(self) # pack the underlying storage, which will allow us to determine # which serials are current. result = unproxied.pack(packtime, referencesf) # perform a pack on blob data self._lock_acquire() try: if self.__supportsUndo: self._packUndoing(packtime, referencesf) else: self._packNonUndoing(packtime, referencesf) finally: self._lock_release() return result @non_overridable def getSize(self): """Return the size of the database in bytes.""" orig_size = getProxiedObject(self).getSize() blob_size = 0 base_dir = self.fshelper.base_dir for oid in os.listdir(base_dir): for serial in os.listdir(os.path.join(base_dir, oid)): if not serial.endswith(BLOB_SUFFIX): continue file_path = os.path.join(base_dir, oid, serial) blob_size += os.stat(file_path).st_size return orig_size + blob_size @non_overridable def undo(self, serial_id, transaction): undo_serial, keys = getProxiedObject(self).undo(serial_id, transaction) # serial_id is the transaction id of the txn that we wish to undo. # "undo_serial" is the transaction id of txn in which the undo is # performed. "keys" is the list of oids that are involved in the # undo transaction. # The serial_id is assumed to be given to us base-64 encoded # (belying the web UI legacy of the ZODB code :-() serial_id = base64.decodestring(serial_id+'\n') self._lock_acquire() try: # we get all the blob oids on the filesystem related to the # transaction we want to undo. for oid in self.fshelper.getOIDsForSerial(serial_id): # we want to find the serial id of the previous revision # of this blob object. load_result = self.loadBefore(oid, serial_id) if load_result is None: # There was no previous revision of this blob # object. The blob was created in the transaction # represented by serial_id. We copy the blob data # to a new file that references the undo # transaction in case a user wishes to undo this # undo. orig_fn = self.fshelper.getBlobFilename(oid, serial_id) new_fn = self.fshelper.getBlobFilename(oid, undo_serial) else: # A previous revision of this blob existed before the # transaction implied by "serial_id". We copy the blob # data to a new file that references the undo transaction # in case a user wishes to undo this undo. data, serial_before, serial_after = load_result orig_fn = self.fshelper.getBlobFilename(oid, serial_before) new_fn = self.fshelper.getBlobFilename(oid, undo_serial) orig = open(orig_fn, "r") new = open(new_fn, "wb") utils.cp(orig, new) orig.close() new.close() self.dirty_oids.append((oid, undo_serial)) finally: self._lock_release() return undo_serial, keys # To do: # # Production # # - Ensure we detect and replay a failed txn involving blobs forward or # backward at startup. # # Jim: What does this mean? # # Far future # # More options for blob directory structures (e.g. dirstorages # bushy/chunky/lawn/flat). # # Make the ClientStorage support minimizing the blob # cache. (Idea: LRU principle via mstat access time and a # size-based threshold) currently). # # Make blobs able to efficiently consume existing files from the # filesystem # # Savepoint support # ================= # # - A savepoint represents the whole state of the data at a certain point in # time # # - Need special storage for blob savepointing (in the spirit of tmpstorage) # # - What belongs to the state of the data? # # - Data contained in files at that point in time # # - File handles are complex because they might be referred to from various # places. We would have to introduce an abstraction layer to allow # switching them around... # # Simpler solution: :