Commit d89692e2 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Refactor UnionFs: overlay arbitrary FileSystems.

This allows an in-process overlay of eg. r/o ZipFs and r/w LoopbackFileSystem.
parent d659531b
...@@ -29,7 +29,12 @@ func main() { ...@@ -29,7 +29,12 @@ func main() {
DeletionDirName: *deldirname, DeletionDirName: *deldirname,
} }
ufs := unionfs.NewUnionFs(flag.Args()[1:], ufsOptions) fses := make([]fuse.FileSystem, 0)
for _, r := range flag.Args()[1:] {
fses = append(fses, fuse.NewLoopbackFileSystem(r))
}
ufs := unionfs.NewUnionFs("unionfs", fses, ufsOptions)
conn := fuse.NewFileSystemConnector(ufs, nil) conn := fuse.NewFileSystemConnector(ufs, nil)
mountState := fuse.NewMountState(conn) mountState := fuse.NewMountState(conn)
mountState.Debug = *debug mountState.Debug = *debug
......
...@@ -22,6 +22,7 @@ type AutoUnionFs struct { ...@@ -22,6 +22,7 @@ type AutoUnionFs struct {
lock sync.RWMutex lock sync.RWMutex
knownFileSystems map[string]*UnionFs knownFileSystems map[string]*UnionFs
nameRootMap map[string]string
root string root string
connector *fuse.FileSystemConnector connector *fuse.FileSystemConnector
...@@ -48,6 +49,7 @@ const ( ...@@ -48,6 +49,7 @@ const (
func NewAutoUnionFs(directory string, options AutoUnionFsOptions) *AutoUnionFs { func NewAutoUnionFs(directory string, options AutoUnionFsOptions) *AutoUnionFs {
a := new(AutoUnionFs) a := new(AutoUnionFs)
a.knownFileSystems = make(map[string]*UnionFs) a.knownFileSystems = make(map[string]*UnionFs)
a.nameRootMap = make(map[string]string)
a.options = &options a.options = &options
directory, err := filepath.Abs(directory) directory, err := filepath.Abs(directory)
if err != nil { if err != nil {
...@@ -78,25 +80,30 @@ func (me *AutoUnionFs) createFs(name string, roots []string) (*UnionFs, fuse.Sta ...@@ -78,25 +80,30 @@ func (me *AutoUnionFs) createFs(name string, roots []string) (*UnionFs, fuse.Sta
me.lock.Lock() me.lock.Lock()
defer me.lock.Unlock() defer me.lock.Unlock()
used := make(map[string]string) for workspace, root := range me.nameRootMap {
for workspace, v := range me.knownFileSystems { if root == roots[0] && workspace != name {
used[v.Roots()[0]] = workspace log.Printf("Already have a union FS for directory %s in workspace %s",
roots[0], workspace)
return nil, fuse.EBUSY
}
} }
workspace, ok := used[roots[0]] gofs := me.knownFileSystems[name]
if ok { if gofs != nil {
log.Printf("Already have a union FS for directory %s in workspace %s", return gofs, fuse.OK
roots[0], workspace)
return nil, fuse.EBUSY
} }
var gofs *UnionFs fses := make([]fuse.FileSystem, 0)
if me.knownFileSystems[name] == nil { for _, r := range roots {
log.Println("Adding UnionFs for roots", roots) fses = append(fses, fuse.NewLoopbackFileSystem(r))
gofs = NewUnionFs(roots, me.options.UnionFsOptions)
me.knownFileSystems[name] = gofs
} }
identifier := fmt.Sprintf("%v", roots)
log.Println("Adding UnionFs for", identifier)
gofs = NewUnionFs(identifier, fses, me.options.UnionFsOptions)
me.knownFileSystems[name] = gofs
me.nameRootMap[name] = roots[0]
return gofs, fuse.OK return gofs, fuse.OK
} }
...@@ -112,6 +119,7 @@ func (me *AutoUnionFs) rmFs(name string) (code fuse.Status) { ...@@ -112,6 +119,7 @@ func (me *AutoUnionFs) rmFs(name string) (code fuse.Status) {
code = me.connector.Unmount(name) code = me.connector.Unmount(name)
if code.Ok() { if code.Ok() {
me.knownFileSystems[name] = nil, false me.knownFileSystems[name] = nil, false
me.nameRootMap[name] = "", false
} else { } else {
log.Printf("Unmount failed for %s. Code %v", name, code) log.Printf("Unmount failed for %s. Code %v", name, code)
} }
...@@ -174,11 +182,12 @@ func (me *AutoUnionFs) Readlink(path string) (out string, code fuse.Status) { ...@@ -174,11 +182,12 @@ func (me *AutoUnionFs) Readlink(path string) (out string, code fuse.Status) {
name := comps[1] name := comps[1]
me.lock.RLock() me.lock.RLock()
defer me.lock.RUnlock() defer me.lock.RUnlock()
fs := me.knownFileSystems[name]
if fs == nil { root, ok := me.nameRootMap[name]
return "", fuse.ENOENT if ok {
return root, fuse.OK
} }
return fs.Roots()[0], fuse.OK return "", fuse.ENOENT
} }
func (me *AutoUnionFs) getUnionFs(name string) *UnionFs { func (me *AutoUnionFs) getUnionFs(name string) *UnionFs {
......
...@@ -60,25 +60,25 @@ func TestAutoFsSymlink(t *testing.T) { ...@@ -60,25 +60,25 @@ func TestAutoFsSymlink(t *testing.T) {
wd, clean := setup(t) wd, clean := setup(t)
defer clean() defer clean()
err := os.Mkdir(wd+"/store/foo", 0755) err := os.Mkdir(wd+"/store/backing1", 0755)
CheckSuccess(err) CheckSuccess(err)
os.Symlink(wd+"/ro", wd+"/store/foo/READONLY") os.Symlink(wd+"/ro", wd+"/store/backing1/READONLY")
CheckSuccess(err) CheckSuccess(err)
err = os.Symlink(wd+"/store/foo", wd+"/mount/config/bar") err = os.Symlink(wd+"/store/backing1", wd+"/mount/config/manual1")
CheckSuccess(err) CheckSuccess(err)
fi, err := os.Lstat(wd + "/mount/bar/file1") fi, err := os.Lstat(wd + "/mount/manual1/file1")
CheckSuccess(err) CheckSuccess(err)
err = os.Remove(wd + "/mount/config/bar") err = os.Remove(wd + "/mount/config/manual1")
CheckSuccess(err) CheckSuccess(err)
// Need time for the unmount to be noticed. // Need time for the unmount to be noticed.
log.Println("sleeping...") log.Println("sleeping...")
time.Sleep(entryTtl * 2e9) time.Sleep(entryTtl * 2e9)
fi, _ = os.Lstat(wd + "/mount/foo") fi, _ = os.Lstat(wd + "/mount/manual1")
if fi != nil { if fi != nil {
t.Error("Should not have file:", fi) t.Error("Should not have file:", fi)
} }
...@@ -86,7 +86,7 @@ func TestAutoFsSymlink(t *testing.T) { ...@@ -86,7 +86,7 @@ func TestAutoFsSymlink(t *testing.T) {
_, err = ioutil.ReadDir(wd + "/mount/config") _, err = ioutil.ReadDir(wd + "/mount/config")
CheckSuccess(err) CheckSuccess(err)
_, err = os.Lstat(wd + "/mount/foo/file1") _, err = os.Lstat(wd + "/mount/backing1/file1")
CheckSuccess(err) CheckSuccess(err)
} }
...@@ -118,5 +118,4 @@ func TestCreationChecks(t *testing.T) { ...@@ -118,5 +118,4 @@ func TestCreationChecks(t *testing.T) {
if code != fuse.EINVAL { if code != fuse.EINVAL {
t.Error("Should return EINVAL", err) t.Error("Should return EINVAL", err)
} }
} }
package unionfs package unionfs
import ( import (
"os" "github.com/hanwen/go-fuse/fuse"
"sync" "sync"
"log" "log"
"time" "time"
...@@ -12,22 +12,18 @@ import ( ...@@ -12,22 +12,18 @@ import (
On error, returns an empty map, since we have little options On error, returns an empty map, since we have little options
for outputting any other diagnostics. for outputting any other diagnostics.
*/ */
func newDirnameMap(dir string) map[string]bool { func newDirnameMap(fs fuse.FileSystem, dir string) map[string]bool {
result := make(map[string]bool) result := make(map[string]bool)
f, err := os.Open(dir) stream, code := fs.OpenDir(dir)
if err != nil { if !code.Ok() {
log.Printf("newDirnameMap(): %v %v", dir, err) log.Printf("newDirnameMap(): %v %v", dir, code)
return result return result
} }
names, err := f.Readdirnames(-1) for e := range stream {
if err != nil { if e.Mode & fuse.S_IFREG != 0 {
log.Printf("newDirnameMap(): readdirnames %v %v", dir, err) result[e.Name] = true
return result }
}
for _, n := range names {
result[n] = true
} }
return result return result
} }
...@@ -41,7 +37,7 @@ func newDirnameMap(dir string) map[string]bool { ...@@ -41,7 +37,7 @@ func newDirnameMap(dir string) map[string]bool {
type DirCache struct { type DirCache struct {
dir string dir string
ttlNs int64 ttlNs int64
fs fuse.FileSystem
// Protects data below. // Protects data below.
lock sync.RWMutex lock sync.RWMutex
...@@ -62,8 +58,8 @@ func (me *DirCache) setMap(newMap map[string]bool) { ...@@ -62,8 +58,8 @@ func (me *DirCache) setMap(newMap map[string]bool) {
func (me *DirCache) DropCache() { func (me *DirCache) DropCache() {
me.lock.Lock() me.lock.Lock()
defer me.lock.Unlock()
me.names = nil me.names = nil
me.lock.Unlock()
} }
// Try to refresh: if another update is already running, do nothing, // Try to refresh: if another update is already running, do nothing,
...@@ -76,7 +72,7 @@ func (me *DirCache) maybeRefresh() { ...@@ -76,7 +72,7 @@ func (me *DirCache) maybeRefresh() {
} }
me.updateRunning = true me.updateRunning = true
go func() { go func() {
me.setMap(newDirnameMap(me.dir)) me.setMap(newDirnameMap(me.fs, me.dir))
}() }()
} }
...@@ -102,9 +98,10 @@ func (me *DirCache) AddEntry(name string) { ...@@ -102,9 +98,10 @@ func (me *DirCache) AddEntry(name string) {
me.names[name] = true me.names[name] = true
} }
func NewDirCache(dir string, ttlNs int64) *DirCache { func NewDirCache(fs fuse.FileSystem, dir string, ttlNs int64) *DirCache {
dc := new(DirCache) dc := new(DirCache)
dc.dir = dir dc.dir = dir
dc.fs = fs
dc.ttlNs = ttlNs dc.ttlNs = ttlNs
return dc return dc
} }
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"github.com/hanwen/go-fuse/fuse" "github.com/hanwen/go-fuse/fuse"
"io/ioutil"
"log" "log"
"os" "os"
"syscall" "syscall"
...@@ -27,7 +26,6 @@ func filePathHash(path string) string { ...@@ -27,7 +26,6 @@ func filePathHash(path string) string {
return fmt.Sprintf("%x-%s", h.Sum()[:8], base) return fmt.Sprintf("%x-%s", h.Sum()[:8], base)
} }
/* /*
UnionFs implements a user-space union file system, which is UnionFs implements a user-space union file system, which is
...@@ -62,8 +60,7 @@ func filePathHash(path string) string { ...@@ -62,8 +60,7 @@ func filePathHash(path string) string {
type UnionFs struct { type UnionFs struct {
fuse.DefaultFileSystem fuse.DefaultFileSystem
roots []string name string
branches []*fuse.LoopbackFileSystem
// The same, but as interfaces. // The same, but as interfaces.
fileSystems []fuse.FileSystem fileSystems []fuse.FileSystem
...@@ -89,19 +86,13 @@ const ( ...@@ -89,19 +86,13 @@ const (
_DROP_CACHE = ".drop_cache" _DROP_CACHE = ".drop_cache"
) )
func NewUnionFs(roots []string, options UnionFsOptions) *UnionFs { func NewUnionFs(name string, fileSystems []fuse.FileSystem, options UnionFsOptions) *UnionFs {
g := new(UnionFs) g := new(UnionFs)
g.roots = make([]string, len(roots)) g.name = name
copy(g.roots, roots)
g.options = &options g.options = &options
for i, r := range roots { for i, fs := range fileSystems {
var fs fuse.FileSystem
pt := fuse.NewLoopbackFileSystem(r)
g.branches = append(g.branches, pt)
fs = pt
if i > 0 { if i > 0 {
cfs := NewCachingFileSystem(pt, 0) cfs := NewCachingFileSystem(fs, 0)
g.cachingFileSystems = append(g.cachingFileSystems, cfs) g.cachingFileSystems = append(g.cachingFileSystems, cfs)
fs = cfs fs = cfs
} }
...@@ -109,14 +100,18 @@ func NewUnionFs(roots []string, options UnionFsOptions) *UnionFs { ...@@ -109,14 +100,18 @@ func NewUnionFs(roots []string, options UnionFsOptions) *UnionFs {
g.fileSystems = append(g.fileSystems, fs) g.fileSystems = append(g.fileSystems, fs)
} }
deletionDir := g.deletionDir() writable := g.fileSystems[0]
err := os.MkdirAll(deletionDir, 0755) fi, code := writable.GetAttr(options.DeletionDirName)
if err != nil { if code == fuse.ENOENT {
code = writable.Mkdir(options.DeletionDirName, 0755)
fi, code = writable.GetAttr(options.DeletionDirName)
}
if !code.Ok() || !fi.IsDirectory() {
panic(fmt.Sprintf("could not create deletion path %v: %v", panic(fmt.Sprintf("could not create deletion path %v: %v",
deletionDir, err)) options.DeletionDirName, code))
} }
g.deletionCache = NewDirCache(deletionDir, int64(options.DeletionCacheTTLSecs*1e9)) g.deletionCache = NewDirCache(writable, options.DeletionDirName, int64(options.DeletionCacheTTLSecs*1e9))
g.branchCache = NewTimedCache( g.branchCache = NewTimedCache(
func(n string) interface{} { return g.getBranchAttrNoCache(n) }, func(n string) interface{} { return g.getBranchAttrNoCache(n) },
int64(options.BranchCacheTTLSecs*1e9)) int64(options.BranchCacheTTLSecs*1e9))
...@@ -128,14 +123,23 @@ func NewUnionFs(roots []string, options UnionFsOptions) *UnionFs { ...@@ -128,14 +123,23 @@ func NewUnionFs(roots []string, options UnionFsOptions) *UnionFs {
// Deal with all the caches. // Deal with all the caches.
func (me *UnionFs) isDeleted(name string) bool { func (me *UnionFs) isDeleted(name string) bool {
haveCache, found := me.deletionCache.HasEntry(filePathHash(name)) marker := me.deletionPath(name)
haveCache, found := me.deletionCache.HasEntry(filepath.Base(marker))
if haveCache { if haveCache {
return found return found
} }
fileName := me.deletionPath(name) _, code := me.fileSystems[0].GetAttr(marker)
fi, _ := os.Lstat(fileName)
return fi != nil if code == fuse.OK {
return true
}
if code == fuse.ENOENT {
return false
}
panic(fmt.Sprintf("Unexpected GetAttr return code %v %v", code, marker))
return false
} }
func (me *UnionFs) getBranch(name string) branchResult { func (me *UnionFs) getBranch(name string) branchResult {
...@@ -186,15 +190,8 @@ func (me *UnionFs) getBranchAttrNoCache(name string) branchResult { ...@@ -186,15 +190,8 @@ func (me *UnionFs) getBranchAttrNoCache(name string) branchResult {
//////////////// ////////////////
// Deletion. // Deletion.
func (me *UnionFs) deletionDir() string {
dir := filepath.Join(me.branches[0].GetPath(""), me.options.DeletionDirName)
return dir
}
func (me *UnionFs) deletionPath(name string) string { func (me *UnionFs) deletionPath(name string) string {
dir := me.deletionDir() return filepath.Join(me.options.DeletionDirName, filePathHash(name))
return filepath.Join(dir, filePathHash(name))
} }
func (me *UnionFs) removeDeletion(name string) { func (me *UnionFs) removeDeletion(name string) {
...@@ -204,24 +201,32 @@ func (me *UnionFs) removeDeletion(name string) { ...@@ -204,24 +201,32 @@ func (me *UnionFs) removeDeletion(name string) {
// os.Remove tries to be smart and issues a Remove() and // os.Remove tries to be smart and issues a Remove() and
// Rmdir() sequentially. We want to skip the 2nd system call, // Rmdir() sequentially. We want to skip the 2nd system call,
// so use syscall.Unlink() directly. // so use syscall.Unlink() directly.
errno := syscall.Unlink(marker)
if errno != 0 && errno != syscall.ENOENT { code := me.fileSystems[0].Unlink(marker)
log.Printf("error unlinking %s: %v", marker, errno) if !code.Ok() && code != fuse.ENOENT {
log.Printf("error unlinking %s: %v", marker, code)
} }
} }
func (me *UnionFs) putDeletion(name string) fuse.Status { func (me *UnionFs) putDeletion(name string) fuse.Status {
fileName := me.deletionPath(name) marker := me.deletionPath(name)
me.deletionCache.AddEntry(path.Base(fileName)) me.deletionCache.AddEntry(path.Base(marker))
// Is there a WriteStringToFileOrDie ? // Is there a WriteStringToFileOrDie ?
err := ioutil.WriteFile(fileName, []byte(name), 0644) writable := me.fileSystems[0]
if err != nil { f, code := writable.Open(marker, uint32(os.O_TRUNC|os.O_WRONLY|os.O_CREATE))
if !code.Ok() {
log.Printf("could not create deletion file %v: %v", log.Printf("could not create deletion file %v: %v",
fileName, err) marker, code)
return fuse.EPERM return fuse.EPERM
} }
defer f.Release()
defer f.Flush()
n, code := f.Write(&fuse.WriteIn{}, []byte(name))
if int(n) != len(name) || !code.Ok() {
panic(fmt.Sprintf("Error for writing %v: %v, %v (exp %v) %v", name, marker, n, len(name), code))
}
return fuse.OK return fuse.OK
} }
...@@ -229,8 +234,8 @@ func (me *UnionFs) putDeletion(name string) fuse.Status { ...@@ -229,8 +234,8 @@ func (me *UnionFs) putDeletion(name string) fuse.Status {
// Promotion. // Promotion.
func (me *UnionFs) Promote(name string, srcResult branchResult) fuse.Status { func (me *UnionFs) Promote(name string, srcResult branchResult) fuse.Status {
writable := me.branches[0] writable := me.fileSystems[0]
sourceFs := me.branches[srcResult.branch] sourceFs := me.fileSystems[srcResult.branch]
// Promote directories. // Promote directories.
me.promoteDirsTo(name) me.promoteDirsTo(name)
...@@ -558,20 +563,21 @@ func (me *UnionFs) OpenDir(directory string) (stream chan fuse.DirEntry, status ...@@ -558,20 +563,21 @@ func (me *UnionFs) OpenDir(directory string) (stream chan fuse.DirEntry, status
// We could try to use the cache, but we have a delay, so // We could try to use the cache, but we have a delay, so
// might as well get the fresh results async. // might as well get the fresh results async.
var wg sync.WaitGroup
var deletions map[string]bool var deletions map[string]bool
deletionsDone := make(chan bool, 1)
wg.Add(1)
go func() { go func() {
deletions = newDirnameMap(me.deletionDir()) deletions = newDirnameMap(me.fileSystems[0], me.options.DeletionDirName)
deletionsDone <- true wg.Done()
}() }()
entries := make([]map[string]uint32, len(me.branches)) entries := make([]map[string]uint32, len(me.fileSystems))
for i, _ := range me.branches { for i, _ := range me.fileSystems {
entries[i] = make(map[string]uint32) entries[i] = make(map[string]uint32)
} }
statuses := make([]fuse.Status, len(me.branches)) statuses := make([]fuse.Status, len(me.fileSystems))
var wg sync.WaitGroup
for i, l := range me.fileSystems { for i, l := range me.fileSystems {
if i >= dirBranch.branch { if i >= dirBranch.branch {
wg.Add(1) wg.Add(1)
...@@ -591,7 +597,6 @@ func (me *UnionFs) OpenDir(directory string) (stream chan fuse.DirEntry, status ...@@ -591,7 +597,6 @@ func (me *UnionFs) OpenDir(directory string) (stream chan fuse.DirEntry, status
} }
wg.Wait() wg.Wait()
_ = <-deletionsDone
results := entries[0] results := entries[0]
...@@ -669,7 +674,7 @@ func (me *UnionFs) Rename(src string, dst string) (code fuse.Status) { ...@@ -669,7 +674,7 @@ func (me *UnionFs) Rename(src string, dst string) (code fuse.Status) {
} }
func (me *UnionFs) DropCaches() { func (me *UnionFs) DropCaches() {
log.Println("Forced cache drop on", me.roots) log.Println("Forced cache drop on", me.name)
me.branchCache.DropAll() me.branchCache.DropAll()
me.deletionCache.DropCache() me.deletionCache.DropCache()
for _, fs := range me.cachingFileSystems { for _, fs := range me.cachingFileSystems {
...@@ -703,9 +708,6 @@ func (me *UnionFs) Flush(name string) fuse.Status { ...@@ -703,9 +708,6 @@ func (me *UnionFs) Flush(name string) fuse.Status {
return fuse.OK return fuse.OK
} }
func (me *UnionFs) Roots() (result []string) { func (me *UnionFs) Name() string {
for _, loopback := range me.branches { return me.name
result = append(result, loopback.GetPath(""))
}
return result
} }
...@@ -37,10 +37,10 @@ func setupUfs(t *testing.T) (workdir string, cleanup func()) { ...@@ -37,10 +37,10 @@ func setupUfs(t *testing.T) (workdir string, cleanup func()) {
os.Mkdir(wd+"/ro", 0700) os.Mkdir(wd+"/ro", 0700)
fuse.CheckSuccess(err) fuse.CheckSuccess(err)
var roots []string var fses []fuse.FileSystem
roots = append(roots, wd+"/rw") fses = append(fses, fuse.NewLoopbackFileSystem(wd+"/rw"))
roots = append(roots, wd+"/ro") fses = append(fses, fuse.NewLoopbackFileSystem(wd+"/ro"))
ufs := NewUnionFs(roots, testOpts) ufs := NewUnionFs("testFs", fses, testOpts)
opts := &fuse.MountOptions{ opts := &fuse.MountOptions{
EntryTimeout: entryTtl, EntryTimeout: entryTtl,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment