Commit cc986bde authored by Jim Fulton's avatar Jim Fulton

Added support for copying and recovery of blob storages:

- Added a helper function, ZODB.blob.is_blob_record for testing whether
  a data record is for a blob.  This can be used when iterating over a
  storage to detect blob records so that blob data can be copied.

In the future, we may want to build this into a blob-aware
  iteration interface, so that records get blob file attributes
  automatically.

- Added the IBlobStorageRestoreable interfaces for blob storages
  that support recovery via a restoreBlob method.

- Updated ZODB.blob.BlobStorage to implement
  IBlobStorageRestoreable and to have a copyTransactionsFrom method
  that also copies blob data.

Also removed the version argument from the history method.
......@@ -8,6 +8,23 @@
New Features
------------
- Added support for copying and recovery of blob storages:
- Added a helper function, ZODB.blob.is_blob_record for testing whether
a data record is for a blob. This can be used when iterating over a
storage to detect blob records so that blob data can be copied.
In the future, we may want to build this into a blob-aware
iteration interface, so that records get blob file attributes
automatically.
- Added the IBlobStorageRestoreable interfaces for blob storages
that support recovery via a restoreBlob method.
- Updated ZODB.blob.BlobStorage to implement
IBlobStorageRestoreable and to have a copyTransactionsFrom method
that also copies blob data.
- New `ClientStorage` configuration option `drop_cache_rather_verify`.
If this option is true then the ZEO client cache is dropped instead of
the long (unoptimized) verification. For large caches, setting this
......
......@@ -14,6 +14,7 @@
"""Blobs
"""
import cPickle
import base64
import logging
import os
......@@ -443,6 +444,10 @@ class BlobStorage(SpecificationDecoratorBase):
self.__supportsUndo = supportsUndo
self._blobs_pack_is_in_progress = False
if ZODB.interfaces.IStorageRestoreable.providedBy(storage):
zope.interface.alsoProvides(self,
ZODB.interfaces.IBlobStorageRestoreable)
@non_overridable
def temporaryDirectory(self):
return self.fshelper.temp_dir
......@@ -452,14 +457,9 @@ class BlobStorage(SpecificationDecoratorBase):
normal_storage = getProxiedObject(self)
return '<BlobStorage proxy for %r at %s>' % (normal_storage,
hex(id(self)))
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
@non_overridable
def _storeblob(self, oid, serial, blobfilename):
self._lock_acquire()
try:
targetpath = self.fshelper.getPathForOID(oid)
......@@ -474,8 +474,52 @@ class BlobStorage(SpecificationDecoratorBase):
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
assert not version, "Versions aren't supported."
serial = self.store(oid, oldserial, data, '', transaction)
self._storeblob(oid, serial, blobfilename)
return self._tid
@non_overridable
def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
transaction):
"""Write blob data already committed in a separate database
"""
self.restore(oid, serial, data, '', prev_txn, transaction)
self._storeblob(oid, serial, blobfilename)
return self._tid
@non_overridable
def copyTransactionsFrom(self, other):
for trans in other.iterator():
self.tpc_begin(trans, trans.tid, trans.status)
for record in trans:
blobfilename = None
if is_blob_record(record.data):
try:
blobfilename = other.loadBlob(record.oid, record.tid)
except POSKeyError:
pass
if blobfilename is not None:
fd, name = tempfile.mkstemp(
suffix='.tmp', dir=self.fshelper.temp_dir)
os.close(fd)
utils.cp(open(blobfilename), open(name, 'wb'))
self.restoreBlob(record.oid, record.tid, record.data,
name, record.data_txn, trans)
else:
self.restore(record.oid, record.tid, record.data,
'', record.data_txn, trans)
self.tpc_vote(trans)
self.tpc_finish(trans)
@non_overridable
def tpc_finish(self, *arg, **kw):
# We need to override the base storage's tpc_finish instead of
......@@ -692,3 +736,13 @@ if sys.platform == 'win32':
else:
remove_committed = os.remove
remove_committed_dir = shutil.rmtree
def is_blob_record(record):
"""Check whether a database record is a blob record.
This is primarily intended to be used when copying data from one
storage to another.
"""
return cPickle.loads(record) is ZODB.blob.Blob
......@@ -446,7 +446,7 @@ class IStorage(Interface):
This is used soley for informational purposes.
"""
def history(oid, version='', size=1):
def history(oid, size=1):
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
......@@ -1009,6 +1009,14 @@ class IBlobStorage(Interface):
If Blobs use this, then commits can be performed with a simple rename.
"""
class IBlobStorageRestoreable(IBlobStorage, IStorageRestoreable):
def storeBlob(oid, serial, data, blobfilename, prev_txn, transaction):
"""Write blob data already committed in a separate database
See the restore and storeBlob methods.
"""
class BlobError(Exception):
pass
......
......@@ -25,7 +25,7 @@ from ZODB.utils import U64, p64
from transaction import Transaction
import itertools
import ZODB.blob
class IteratorCompare:
......@@ -214,6 +214,18 @@ class IteratorDeepCompare:
eq(rec1.oid, rec2.oid)
eq(rec1.tid, rec2.tid)
eq(rec1.data, rec2.data)
if ZODB.blob.is_blob_record(rec1.data):
try:
fn1 = storage1.loadBlob(rec1.oid, rec1.tid)
except ZODB.POSException.POSKeyError:
self.assertRaises(
ZODB.POSException.POSKeyError,
storage2.loadBlob, rec1.oid, rec1.tid)
else:
fn2 = storage2.loadBlob(rec1.oid, rec1.tid)
self.assert_(fn1 != fn2)
eq(open(fn1).read(), open(fn2).read())
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
# Additionally, check that we're backwards compatible to the
......
......@@ -12,10 +12,15 @@
#
##############################################################################
import base64, os, re, shutil, stat, sys, tempfile, unittest
import base64, os, re, shutil, stat, sys, tempfile, unittest, random, struct
import time
import ZODB.tests.IteratorStorage
from zope.testing import doctest, renormalizing
import zope.testing.setupstack
import ZODB.tests.util
import ZODB.interfaces
from StringIO import StringIO
from pickle import Pickler
......@@ -275,6 +280,49 @@ class BlobUndoTests(BlobTests):
database.close()
class RecoveryBlobStorage(unittest.TestCase,
ZODB.tests.IteratorStorage.IteratorDeepCompare):
def setUp(self):
self.globs = {}
zope.testing.setupstack.setUpDirectory(self)
self._storage = BlobStorage(
'src_blobs', ZODB.FileStorage.FileStorage("Source.fs", create=True))
self._dst = BlobStorage(
'dest_blobs', ZODB.FileStorage.FileStorage("Dest.fs", create=True))
def tearDown(self):
self._storage.close()
self._dst.close()
zope.testing.setupstack.tearDown(self)
# Requires a setUp() that creates a self._dst destination storage
def testSimpleBlobRecovery(self):
self.assert_(
ZODB.interfaces.IBlobStorageRestoreable.providedBy(self._storage)
)
db = DB(self._storage)
conn = db.open()
conn.root()[1] = ZODB.blob.Blob()
transaction.commit()
conn.root()[2] = ZODB.blob.Blob()
conn.root()[2].open('w').write('some data')
transaction.commit()
conn.root()[3] = ZODB.blob.Blob()
conn.root()[3].open('w').write(
(''.join(struct.pack(">I", random.randint(0, (1<<32)-1))
for i in range(random.randint(10000,20000)))
)[:-random.randint(1,4)]
)
transaction.commit()
conn.root()[2] = ZODB.blob.Blob()
conn.root()[2].open('w').write('some other data')
transaction.commit()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def gc_blob_removes_uncommitted_data():
"""
>>> from ZODB.blob import Blob
......@@ -540,6 +588,22 @@ def loadblob_tmpstore():
>>> os.unlink(storagefile+".tmp")
"""
def is_blob_record():
"""
>>> fs = FileStorage('Data.fs')
>>> bs = ZODB.blob.BlobStorage('blobs', fs)
>>> db = DB(bs)
>>> conn = db.open()
>>> conn.root()['blob'] = ZODB.blob.Blob()
>>> transaction.commit()
>>> ZODB.blob.is_blob_record(fs.load(ZODB.utils.p64(0), '')[0])
False
>>> ZODB.blob.is_blob_record(fs.load(ZODB.utils.p64(1), '')[0])
True
>>> db.close()
"""
def setUp(test):
ZODB.tests.util.setUp(test)
def rmtree(path):
......@@ -575,6 +639,7 @@ def test_suite():
))
suite.addTest(unittest.makeSuite(BlobCloneTests))
suite.addTest(unittest.makeSuite(BlobUndoTests))
suite.addTest(unittest.makeSuite(RecoveryBlobStorage))
return suite
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment