Commit 58073772 authored by bescoto's avatar bescoto

Some refactoring of metadata code


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@668 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 25183ca0
......@@ -23,7 +23,7 @@ from __future__ import generators
import errno
import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \
rpath, static, log, selection, Time, Rdiff, statistics, iterfile, \
eas_acls, hash
hash
def Mirror(src_rpath, dest_rpath):
"""Turn dest_rpath into a copy of src_rpath"""
......@@ -135,10 +135,9 @@ class DestinationStruct:
sel.parse_rbdir_exclude()
return sel.set_iter()
metadata.SetManager()
if use_metadata:
rorp_iter = eas_acls.GetCombinedMetadataIter(
Globals.rbdir, Time.prevtime,
acls = Globals.acls_active, eas = Globals.eas_active)
rorp_iter = metadata.ManagerObj.GetAtTime(Time.prevtime)
if rorp_iter: return rorp_iter
return get_iter_from_fs()
......@@ -273,9 +272,7 @@ class CacheCollatedPostProcess:
self.statfileobj = statistics.init_statfileobj()
if Globals.file_statistics: statistics.FileStats.init()
metadata.MetadataFile.open_file()
if Globals.eas_active: eas_acls.ExtendedAttributesFile.open_file()
if Globals.acls_active: eas_acls.AccessControlListFile.open_file()
self.metawriter = metadata.ManagerObj.GetWriter()
# the following should map indicies to lists
# [source_rorp, dest_rorp, changed_flag, success_flag, increment]
......@@ -381,13 +378,7 @@ class CacheCollatedPostProcess:
self.statfileobj.add_changed(source_rorp, dest_rorp)
if metadata_rorp and metadata_rorp.lstat():
metadata.MetadataFile.write_object(metadata_rorp)
if Globals.eas_active and not metadata_rorp.get_ea().empty():
eas_acls.ExtendedAttributesFile.write_object(
metadata_rorp.get_ea())
if Globals.acls_active and not metadata_rorp.get_acl().is_basic():
eas_acls.AccessControlListFile.write_object(
metadata_rorp.get_acl())
self.metawriter.write_object(metadata_rorp)
if Globals.file_statistics:
statistics.FileStats.update(source_rorp, dest_rorp, changed, inc)
......@@ -451,9 +442,8 @@ class CacheCollatedPostProcess:
while self.dir_perms_list:
dir_rp, perms = self.dir_perms_list.pop()
dir_rp.chmod(perms)
metadata.MetadataFile.close_file()
if Globals.eas_active: eas_acls.ExtendedAttributesFile.close_file()
if Globals.acls_active: eas_acls.AccessControlListFile.close_file()
self.metawriter.close()
if Globals.print_statistics: statistics.print_active_stats()
if Globals.file_statistics: statistics.FileStats.close()
statistics.write_active_statfileobj()
......
......@@ -30,7 +30,7 @@ from __future__ import generators
import base64, errno, re
try: import posix1e
except ImportError: pass
import static, Globals, metadata, connection, rorpiter, log, C, \
import static, Globals, eas_acls, connection, metadata, rorpiter, log, C, \
rpath, user_group
# When an ACL gets dropped, put name in dropped_acl_names. This is
......@@ -170,25 +170,14 @@ class ExtendedAttributesFile(metadata.FlatFile):
_extractor = EAExtractor
_object_to_record = staticmethod(EA2Record)
def join(cls, rorp_iter, rbdir, time, restrict_index):
"""Add extended attribute information to existing rorp_iter"""
def helper(rorp_iter, ea_iter):
"""Add EA information in ea_iter to rorp_iter"""
collated = rorpiter.CollateIterators(rorp_iter, ea_iter)
for rorp, ea in collated:
assert rorp, (rorp, (ea.index, ea.attr_dict), time)
def join_ea_iter(rorp_iter, ea_iter):
"""Update a rorp iter by adding the information from ea_iter"""
for rorp, ea in rorpiter.CollateIterators(rorp_iter, ea_iter):
assert rorp, "Missing rorp for index %s" % (ea.index,)
if not ea: ea = ExtendedAttributes(rorp.index)
rorp.set_ea(ea)
yield rorp
ea_iter = cls.get_objects_at_time(rbdir, time, restrict_index)
if not ea_iter:
log.Log("Warning: Extended attributes file not found", 2)
ea_iter = iter([])
return helper(rorp_iter, ea_iter)
static.MakeClass(ExtendedAttributesFile)
class AccessControlLists:
"""Hold a file's access control list information
......@@ -521,49 +510,14 @@ class AccessControlListFile(metadata.FlatFile):
_extractor = ACLExtractor
_object_to_record = staticmethod(ACL2Record)
def join(cls, rorp_iter, rbdir, time, restrict_index):
"""Add access control list information to existing rorp_iter"""
def helper(rorp_iter, acl_iter):
"""Add ACL information in acl_iter to rorp_iter"""
collated = rorpiter.CollateIterators(rorp_iter, acl_iter)
for rorp, acl in collated:
def join_acl_iter(rorp_iter, acl_iter):
"""Update a rorp iter by adding the information from acl_iter"""
for rorp, acl in rorpiter.CollateIterators(rorp_iter, acl_iter):
assert rorp, "Missing rorp for index %s" % (acl.index,)
if not acl: acl = AccessControlLists(rorp.index)
rorp.set_acl(acl)
yield rorp
acl_iter = cls.get_objects_at_time(rbdir, time, restrict_index)
if not acl_iter:
log.Log("Warning: Access Control List file not found", 2)
acl_iter = iter([])
return helper(rorp_iter, acl_iter)
static.MakeClass(AccessControlListFile)
def GetCombinedMetadataIter(rbdir, time, restrict_index = None,
acls = None, eas = None):
"""Return iterator of rorps from metadata and related files
None will be returned if the metadata file is absent. If acls or
eas is true, access control list or extended attribute information
will be added.
"""
metadata_iter = metadata.MetadataFile.get_objects_at_time(
rbdir, time, restrict_index)
if not metadata_iter:
log.Log("Warning, metadata file not found.\n"
"Metadata will be read from filesystem.", 2)
return None
if eas:
metadata_iter = ExtendedAttributesFile.join(
metadata_iter, rbdir, time, restrict_index)
if acls:
metadata_iter = AccessControlListFile.join(
metadata_iter, rbdir, time, restrict_index)
return metadata_iter
def rpath_acl_get(rp):
"""Get acls of given rpath rp.
......
......@@ -56,7 +56,7 @@ field names and values.
from __future__ import generators
import re, gzip, os, binascii
import log, Globals, rpath, Time, robust, increment, static
import log, Globals, rpath, Time, robust, increment, static, rorpiter
class ParsingError(Exception):
"""This is raised when bad or unparsable data is received"""
......@@ -257,15 +257,20 @@ class FlatExtractor:
def iterate(self):
"""Return iterator that yields all objects with records"""
while 1:
next_pos = self.get_next_pos()
try: yield self.record_to_object(self.buf[:next_pos])
for record in self.iterate_records():
try: yield self.record_to_object(record)
except ParsingError, e:
if self.at_end: break # Ignore whitespace/bad records at end
log.Log("Error parsing flat file: %s" % (e,), 2)
def iterate_records(self):
"""Yield all text records in order"""
while 1:
next_pos = self.get_next_pos()
yield self.buf[:next_pos]
if self.at_end: break
self.buf = self.buf[next_pos:]
assert not self.close()
assert not self.fileobj.close()
def skip_to_index(self, index):
"""Scan through the file, set buffer to beginning of index record
......@@ -304,7 +309,7 @@ class FlatExtractor:
yield obj
if self.at_end: break
self.buf = self.buf[next_pos:]
assert not self.close()
assert not self.fileobj.close()
def filename_to_index(self, filename):
"""Translate filename, possibly quoted, into an index tuple
......@@ -315,9 +320,6 @@ class FlatExtractor:
"""
assert 0 # subclass
def close(self):
"""Return value of closing associated file"""
return self.fileobj.close()
class RorpExtractor(FlatExtractor):
"""Iterate rorps from metadata file"""
......@@ -335,87 +337,60 @@ class FlatFile:
recommended.
"""
_prefix = None # Set this to real prefix when subclassing
_rp, _fileobj = None, None
# Buffering may be useful because gzip writes are slow
_buffering_on = 1
rp, fileobj, mode = None, None, None
_buffering_on = 1 # Buffering may be useful because gzip writes are slow
_record_buffer, _max_buffer_size = None, 100
_extractor = FlatExtractor # Set to class that iterates objects
def open_file(cls, rp = None, compress = 1):
"""Open file for writing. Use cls._rp if rp not given."""
assert not cls._fileobj, "Flatfile already open"
cls._record_buffer = []
if rp: cls._rp = rp
_extractor = FlatExtractor # Override to class that iterates objects
_object_to_record = None # Set to function converting object to record
_prefix = None # Set to required prefix
def __init__(self, rp, mode):
"""Open rp for reading ('r') or writing ('w')"""
self.rp = rp
self.mode = mode
self._record_buffer = []
assert rp.isincfile() and rp.getincbase_str() == self._prefix, rp
if mode == 'r':
self.fileobj = self.rp.open("rb", rp.isinccompressed())
else:
if compress: typestr = 'snapshot.gz'
else: typestr = 'snapshot'
cls._rp = Globals.rbdir.append(
"%s.%s.%s" % (cls._prefix, Time.curtimestr, typestr))
cls._fileobj = cls._rp.open("wb", compress = compress)
def write_object(cls, object):
assert mode == 'w' and not self.rp.lstat(), (mode, rp)
self.fileobj = self.rp.open("wb", rp.isinccompressed())
def write_record(self, record):
"""Write a (text) record into the file"""
if self._buffering_on:
self._record_buffer.append(record)
if len(self._record_buffer) >= self._max_buffer_size:
self.fileobj.write("".join(self._record_buffer))
self._record_buffer = []
else: self.fileobj.write(record)
def write_object(self, object):
"""Convert one object to record and write to file"""
record = cls._object_to_record(object)
if cls._buffering_on:
cls._record_buffer.append(record)
if len(cls._record_buffer) >= cls._max_buffer_size:
cls._fileobj.write("".join(cls._record_buffer))
cls._record_buffer = []
else: cls._fileobj.write(record)
def close_file(cls):
"""Close file, for when any writing is done"""
assert cls._fileobj, "File already closed"
if cls._buffering_on and cls._record_buffer:
cls._fileobj.write("".join(cls._record_buffer))
cls._record_buffer = []
try: fileno = cls._fileobj.fileno() # will not work if GzipFile
except AttributeError: fileno = cls._fileobj.fileobj.fileno()
os.fsync(fileno)
result = cls._fileobj.close()
cls._fileobj = None
cls._rp.setdata()
return result
self.write_record(self._object_to_record(object))
def get_objects(cls, restrict_index = None, compressed = None):
def get_objects(self, restrict_index = None):
"""Return iterator of objects records from file rp"""
assert cls._rp, "Must have rp set before get_objects can be used"
if compressed is None:
if cls._rp.isincfile():
compressed = cls._rp.inc_compressed
assert (cls._rp.inc_type == 'data' or
cls._rp.inc_type == 'snapshot'), cls._rp.inc_type
else: compressed = cls._rp.get_indexpath().endswith('.gz')
fileobj = cls._rp.open('rb', compress = compressed)
if not restrict_index: return cls._extractor(fileobj).iterate()
else:
re = cls._extractor(fileobj)
return re.iterate_starting_with(restrict_index)
def get_objects_at_time(cls, rbdir, time, restrict_index = None,
rblist = None):
"""Scan through rbdir, finding data at given time, iterate
if not restrict_index: return self._extractor(self.fileobj).iterate()
extractor = self._extractor(self.fileobj)
return extractor.iterate_starting_with(restrict_index)
If rblist is givenr, use that instead of listing rbdir. Time
here is exact, we don't take the next one older or anything.
Returns None if no file matching prefix is found.
def get_records(self):
"""Return iterator of text records"""
return self._extractor(self.fileobj).iterate_records()
"""
if rblist is None:
rblist = map(lambda x: rbdir.append(x), robust.listrp(rbdir))
for rp in rblist:
if (rp.isincfile() and
(rp.getinctype() == "data" or rp.getinctype() == "snapshot")
and rp.getincbase_str() == cls._prefix):
if rp.getinctime() == time:
cls._rp = rp
return cls.get_objects(restrict_index)
return None
static.MakeClass(FlatFile)
def close(self):
"""Close file, for when any writing is done"""
assert self.fileobj, "File already closed"
if self._buffering_on and self._record_buffer:
self.fileobj.write("".join(self._record_buffer))
self._record_buffer = []
try: fileno = self.fileobj.fileno() # will not work if GzipFile
except AttributeError: fileno = self.fileobj.fileobj.fileno()
os.fsync(fileno)
result = self.fileobj.close()
self.fileobj = None
self.rp.setdata()
return result
class MetadataFile(FlatFile):
"""Store/retrieve metadata from mirror_metadata as rorps"""
......@@ -423,3 +398,165 @@ class MetadataFile(FlatFile):
_extractor = RorpExtractor
_object_to_record = staticmethod(RORP2Record)
class CombinedWriter:
"""Used for simultaneously writting metadata, eas, and acls"""
def __init__(self, metawriter, eawriter, aclwriter):
self.metawriter = metawriter
self.eawriter, self.aclwriter = eawriter, aclwriter # these can be None
def write_object(self, rorp):
"""Write information in rorp to all the writers"""
self.metawriter.write_object(rorp)
if self.eawriter and not rorp.get_ea().empty():
self.eawriter.write_object(rorp.get_ea())
if self.aclwriter and not rorp.get_acl().is_basic():
self.aclwriter.write_object(rorp.get_acl())
def close(self):
self.metawriter.close()
if self.eawriter: self.eawriter.close()
if self.aclwriter: self.aclwriter.close()
class Manager:
"""Read/Combine/Write metadata files by time"""
meta_prefix = 'mirror_metadata'
acl_prefix = 'access_control_lists'
ea_prefix = 'extended_attributes'
def __init__(self):
"""Set listing of rdiff-backup-data dir"""
self.rplist = []
self.timerpmap = {}
for filename in Globals.rbdir.listdir():
rp = Globals.rbdir.append(filename)
if rp.isincfile():
self.rplist.append(rp)
time = rp.getinctime()
if self.timerpmap.has_key(time):
self.timerpmap[time].append(rp)
else: self.timerpmap[time] = [rp]
def _iter_helper(self, prefix, flatfileclass, time, restrict_index):
"""Used below to find the right kind of file by time"""
if not self.timerpmap.has_key(time): return None
for rp in self.timerpmap[time]:
if rp.getincbase_str() == prefix:
return flatfileclass(rp, 'r').get_objects(restrict_index)
return None
def get_meta_at_time(self, time, restrict_index):
"""Return iter of metadata rorps at given time (or None)"""
return self._iter_helper(self.meta_prefix, MetadataFile,
time, restrict_index)
def get_eas_at_time(self, time, restrict_index):
"""Return Extended Attributes iter at given time (or None)"""
return self._iter_helper(self.ea_prefix,
eas_acls.ExtendedAttributesFile, time, restrict_index)
def get_acls_at_time(self, time, restrict_index):
"""Return ACLs iter at given time from recordfile (or None)"""
return self._iter_helper(self.acl_prefix,
eas_acls.AccessControlListFile, time, restrict_index)
def GetAtTime(self, time, restrict_index = None):
"""Return combined metadata iter with ea/acl info if necessary"""
cur_iter = self.get_meta_at_time(time, restrict_index)
if not cur_iter:
log.Log("Warning, could not find mirror_metadata file.\n"
"Metadata will be read from filesystem instead.", 2)
return None
if Globals.acls_active:
acl_iter = self.get_acls_at_time(time, restrict_index)
if not acl_iter:
log.Log("Warning: Access Control List file not found", 2)
acl_iter = iter([])
cur_iter = eas_acls.join_acl_iter(cur_iter, acl_iter)
if Globals.eas_active:
ea_iter = self.get_eas_at_time(time, restrict_index)
if not ea_iter:
log.Log("Warning: Extended Attributes file not found", 2)
ea_iter = iter([])
cur_iter = eas_acls.join_ea_iter(cur_iter, ea_iter)
return cur_iter
def _writer_helper(self, prefix, flatfileclass, typestr, time):
"""Used in the get_xx_writer functions, returns a writer class"""
if time is None: timestr = Time.curtimestr
else: timestr = Time.timetostring(time)
filename = '%s.%s.%s.gz' % (prefix, timestr, typestr)
rp = Globals.rbdir.append(filename)
assert not rp.lstat(), "File %s already exists!" % (rp.path,)
return flatfileclass(rp, 'w')
def get_meta_writer(self, typestr, time):
"""Return MetadataFile object opened for writing at given time"""
return self._writer_helper(self.meta_prefix, MetadataFile,
typestr, time)
def get_ea_writer(self, typestr, time):
"""Return ExtendedAttributesFile opened for writing"""
return self._writer_helper(self.ea_prefix,
eas_acls.ExtendedAttributesFile, typestr, time)
def get_acl_writer(self, typestr, time):
"""Return AccessControlListFile opened for writing"""
return self._writer_helper(self.acl_prefix,
eas_acls.AccessControlListFile, typestr, time)
def GetWriter(self, typestr = 'snapshot', time = None):
"""Get a writer object that can write meta and possibly acls/eas"""
metawriter = self.get_meta_writer(typestr, time)
if not Globals.eas_active and not Globals.acls_active:
return metawriter # no need for a CombinedWriter
if Globals.eas_active: ea_writer = self.get_ea_writer(typestr, time)
if Globals.acls_active: acl_writer = self.get_acl_writer(typestr, time)
return CombinedWriter(metawriter, ea_writer, acl_writer)
ManagerObj = None # Set this later to Manager instance
def SetManager():
global ManagerObj
ManagerObj = Manager()
def patch(*meta_iters):
"""Return an iterator of metadata files by combining all the given iters
The iters should be given as a list/tuple in reverse chronological
order. The earliest rorp in each iter will supercede all the
later ones.
"""
for meta_tuple in rorpiter.CollateIterators(*meta_iters):
for i in range(len(meta_tuple)-1, -1, -1):
if meta_tuple[i]:
if meta_tuple[i].lstat(): yield meta_tuple[i]
break # move to next index
else: assert 0, "No valid rorps"
def Convert_diff(cur_time, old_time):
"""Convert the metadata snapshot at old_time to diff format
The point is just to save space. The diff format is simple, just
include in the diff all of the older rorps that are different in
the two metadata rorps.
"""
rblist = [Globals.rbdir.append(filename)
for filename in robust.listrp(Globals.rbdir)]
cur_iter = MetadataFile.get_objects_at_time(
Globals.rbdir, cur_time, None, rblist)
old_iter = MetadataFile.get_objects_at_time(
Globals.rbdir, old_time, None, rblist)
assert cur_iter.type == old_iter.type == 'snapshot'
diff_file = MetadataFile.open_file(None, 1, 'diff', old_time)
for cur_rorp, old_rorp in rorpiter.Collate2Iters(cur_iter, old_iter):
XXX
import eas_acls # put at bottom to avoid python circularity bug
......@@ -143,8 +143,8 @@ def iterate_raw_rfs(mirror_rp, inc_rp):
def yield_metadata():
"""Iterate rorps from metadata file, if any are available"""
metadata_iter = metadata.MetadataFile.get_objects_at_time(Globals.rbdir,
regress_time)
metadata.SetManager()
metadata_iter = metadata.ManagerObj.GetAtTime(regress_time)
if metadata_iter: return metadata_iter
log.Log.FatalError("No metadata for time %s found, cannot regress"
% Time.timetopretty(regress_time))
......
......@@ -22,7 +22,7 @@
from __future__ import generators
import tempfile, os, cStringIO
import Globals, Time, Rdiff, Hardlink, rorpiter, selection, rpath, \
log, static, robust, metadata, statistics, TempFile, eas_acls, hash
log, static, robust, metadata, statistics, TempFile, hash
class RestoreError(Exception): pass
......@@ -177,9 +177,9 @@ class MirrorStruct:
"""
if rest_time is None: rest_time = cls._rest_time
rorp_iter = eas_acls.GetCombinedMetadataIter(
Globals.rbdir, rest_time, restrict_index = cls.mirror_base.index,
acls = Globals.acls_active, eas = Globals.eas_active)
if not metadata.ManagerObj: metadata.SetManager()
rorp_iter = metadata.ManagerObj.GetAtTime(rest_time,
cls.mirror_base.index)
if not rorp_iter:
if require_metadata:
log.Log.FatalError("Mirror metadata not found")
......
......@@ -60,7 +60,7 @@ class MetadataTest(unittest.TestCase):
def write_metadata_to_temp(self):
"""If necessary, write metadata of bigdir to file metadata.gz"""
global tempdir
temprp = tempdir.append("metadata.gz")
temprp = tempdir.append("mirror_metadata.2005-11-03T14:51:06-06:00.snapshot.gz")
if temprp.lstat(): return temprp
self.make_temp()
......@@ -68,19 +68,19 @@ class MetadataTest(unittest.TestCase):
rpath_iter = selection.Select(rootrp).set_iter()
start_time = time.time()
MetadataFile.open_file(temprp)
for rp in rpath_iter: MetadataFile.write_object(rp)
MetadataFile.close_file()
mf = MetadataFile(temprp, 'w')
for rp in rpath_iter: mf.write_object(rp)
mf.close()
print "Writing metadata took %s seconds" % (time.time() - start_time)
return temprp
def testSpeed(self):
"""Test testIterator on 10000 files"""
temprp = self.write_metadata_to_temp()
MetadataFile._rp = temprp
mf = MetadataFile(temprp, 'r')
start_time = time.time(); i = 0
for rorp in MetadataFile.get_objects(): i += 1
for rorp in mf.get_objects(): i += 1
print "Reading %s metadata entries took %s seconds." % \
(i, time.time() - start_time)
......@@ -102,9 +102,9 @@ class MetadataTest(unittest.TestCase):
"""
temprp = self.write_metadata_to_temp()
MetadataFile._rp = temprp
mf = MetadataFile(temprp, 'r')
start_time = time.time(); i = 0
for rorp in MetadataFile.get_objects(("subdir3", "subdir10")): i += 1
for rorp in mf.get_objects(("subdir3", "subdir10")): i += 1
print "Reading %s metadata entries took %s seconds." % \
(i, time.time() - start_time)
assert i == 51
......@@ -112,7 +112,7 @@ class MetadataTest(unittest.TestCase):
def test_write(self):
"""Test writing to metadata file, then reading back contents"""
global tempdir
temprp = tempdir.append("write_test.gz")
temprp = tempdir.append("mirror_metadata.2005-11-03T12:51:06-06:00.snapshot.gz")
if temprp.lstat(): temprp.delete()
self.make_temp()
......@@ -123,14 +123,42 @@ class MetadataTest(unittest.TestCase):
rps = map(rootrp.append, dirlisting)
assert not temprp.lstat()
MetadataFile.open_file(temprp)
for rp in rps: MetadataFile.write_object(rp)
MetadataFile.close_file()
write_mf = MetadataFile(temprp, 'w')
for rp in rps: write_mf.write_object(rp)
write_mf.close()
assert temprp.lstat()
reread_rps = list(MetadataFile.get_objects())
reread_rps = list(MetadataFile(temprp, 'r').get_objects())
assert len(reread_rps) == len(rps), (len(reread_rps), len(rps))
for i in range(len(reread_rps)):
assert reread_rps[i] == rps[i], i
def test_patch(self):
"""Test combining 3 iters of metadata rorps"""
self.make_temp()
os.system('cp -a testfiles/various_file_types/* ' + tempdir.path)
rp1 = tempdir.append('regular_file')
rp2 = tempdir.append('subdir')
rp3 = rp2.append('subdir_file')
rp4 = tempdir.append('test')
rp1new = tempdir.append('regular_file')
rp1new.chmod(0)
zero = rpath.RORPath(('test',))
current = [rp1, rp2, rp3]
diff1 = [rp1, rp4]
diff2 = [rp1new, rp2, zero]
output = patch(iter(current), iter(diff1), iter(diff2))
out1 = output.next()
assert out1 is rp1new, out1
out2 = output.next()
assert out2 is rp2, out2
out3 = output.next()
assert out3 is rp3, out3
self.assertRaises(StopIteration, output.next)
if __name__ == "__main__": unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment