Commit 947e04cb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fa66a781
...@@ -32,5 +32,5 @@ echo >>$out ...@@ -32,5 +32,5 @@ echo >>$out
sed \ sed \
-e "s/VALUE/$VALUE/g" \ -e "s/VALUE/$VALUE/g" \
-e "s/Set/${KIND}Set/g" \ -e "s/Set/Set${KIND}/g" \
$input >>$out $input >>$out
...@@ -24,7 +24,7 @@ type Set map[VALUE]struct{} ...@@ -24,7 +24,7 @@ type Set map[VALUE]struct{}
// Add adds v to the set. // Add adds v to the set.
func (s Set) Add(v VALUE) { func (s Set) Add(v VALUE) {
s[VALUE] = struct{}{} s[v] = struct{}{}
} }
// Has checks whether the set contains v. // Has checks whether the set contains v.
...@@ -33,6 +33,13 @@ func (s Set) Has(v VALUE) bool { ...@@ -33,6 +33,13 @@ func (s Set) Has(v VALUE) bool {
return ok return ok
} }
// Update adds t values to s.
func (s Set) Update(t Set) {
for v := range t {
s.Add(v)
}
}
// Elements returns all elements of set as slice. // Elements returns all elements of set as slice.
func (s Set) Elements() []VALUE { func (s Set) Elements() []VALUE {
ev := make([]VALUE, len(s)) ev := make([]VALUE, len(s))
......
...@@ -277,7 +277,10 @@ package main ...@@ -277,7 +277,10 @@ package main
// won't be served from OS file cache and instead will trigger a FUSE read // won't be served from OS file cache and instead will trigger a FUSE read
// request to wcfs. // request to wcfs.
// //
// 5) for every file δFtail invalidation info about head/data is maintained: // 5) after OS file cache was invalidated, we resync zhead to new database
// view corresponding to tid.
//
// 6) for every file δFtail invalidation info about head/data is maintained:
// //
// - tailv: [](rev↑, []#blk) // - tailv: [](rev↑, []#blk)
// - by: {} #blk -> []rev↑ in tail // - by: {} #blk -> []rev↑ in tail
...@@ -287,9 +290,9 @@ package main ...@@ -287,9 +290,9 @@ package main
// //
// min(rev) in δFtail is min(@at) at which head/data is currently mmapped (see below). // min(rev) in δFtail is min(@at) at which head/data is currently mmapped (see below).
// //
// 6) when we receive a FUSE read(#blk) request to a file/head/data we process it as follows: // 7) when we receive a FUSE read(#blk) request to a file/head/data we process it as follows:
// //
// 6.1) load blkdata for head/data[blk] @zhead.at . // 7.1) load blkdata for head/data[blk] @zhead.at .
// //
// while loading this also gives upper bound estimate of when the block // while loading this also gives upper bound estimate of when the block
// was last changed: // was last changed:
...@@ -313,7 +316,7 @@ package main ...@@ -313,7 +316,7 @@ package main
// rev(blk) ≤ rev'(blk) rev'(blk) = min(^^^) // rev(blk) ≤ rev'(blk) rev'(blk) = min(^^^)
// //
// //
// 6.2) for all client/addr@at mmappings of file/head/data: // 7.2) for all client/addr@at mmappings of file/head/data:
// //
// - rev'(blk) ≤ at: -> do nothing XXX || blk ∉ mapping // - rev'(blk) ≤ at: -> do nothing XXX || blk ∉ mapping
// - rev'(blk) > at: // - rev'(blk) > at:
...@@ -332,7 +335,7 @@ package main ...@@ -332,7 +335,7 @@ package main
// //
// is maintained. // is maintained.
// //
// 6.3) blkdata is returned to kernel. // 7.3) blkdata is returned to kernel.
// //
// Thus a client that wants latest data on pagefault will get latest data, // Thus a client that wants latest data on pagefault will get latest data,
// and a client that wants @rev data will get @rev data, even if it was this // and a client that wants @rev data will get @rev data, even if it was this
...@@ -614,11 +617,11 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) { ...@@ -614,11 +617,11 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) {
r.zheadMu.Lock() r.zheadMu.Lock()
defer r.zheadMu.Unlock() defer r.zheadMu.Unlock()
toinvalidate = ... // [] of file/[]#blk toinvalidate := map[*ZBigFile]SetI64{} // {} zfile -> set(#blk)
// zevent = (tid^, []oid) // zevent = (tid^, []oid)
for _, oid := range zevent.Oidv { for _, oid := range zevent.Changev {
obj := zhead.Cache().Get(oid) obj := r.zhead.Cache().Get(oid)
if obj == nil { if obj == nil {
continue // nothing to do - see invariant continue // nothing to do - see invariant
} }
...@@ -627,15 +630,22 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) { ...@@ -627,15 +630,22 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) {
default: default:
continue // object not related to any bigfile continue // object not related to any bigfile
case *LOBTree: case *btree.LOBTree:
// XXX -> δBTree // XXX -> δBTree
case *LOBucket: case *btree.LOBucket:
// XXX -> δBTree // XXX -> δBTree
case zBlk: // ZBlk0, ZBlk1 case zBlk: // ZBlk*
fileinv := XXX(obj.file) // XXX locking ?
fileinv.blkv += obj.blk // XXX or better obj.blkv ? for zfile, objWhere := range obj.inzfile {
blkmap, ok := toinvalidate[zfile]
if !ok {
blkmap = SetI64{}
toinvalidate[zfile] = blkmap
}
blkmap.Update(objWhere)
}
case *ZBigFile: case *ZBigFile:
// XXX check that .blksize and .blktab (it is only // XXX check that .blksize and .blktab (it is only
...@@ -646,9 +656,10 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) { ...@@ -646,9 +656,10 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) {
} }
//wg = ... //wg = ...
for _, fileinv := range toinvalidate { for file, blkmap := range toinvalidate {
for _, blk := range fileinv.blkv { for blk := range blkmap {
go fileinv.file.invalidateBlk(blk) // XXX -> wg.Go go file.invalidateBlk(blk) // XXX -> wg.Go
}
} }
// XXX resync .zhead to zevent.tid // XXX resync .zhead to zevent.tid
...@@ -672,7 +683,7 @@ func (f *file) invalidateBlk(ctx context.Context, blk int64) error { ...@@ -672,7 +683,7 @@ func (f *file) invalidateBlk(ctx context.Context, blk int64) error {
if len(blkdata) == blksize { if len(blkdata) == blksize {
// XXX -> go // XXX -> go
// store retrieved data back to OS cache for file @<rev>/data[blk] // store retrieved data back to OS cache for file @<rev>/data[blk]
frev, _ = file.δFtail.LastRevOf(blk, at) frev, _ := file.δFtail.LastRevOf(blk, at)
st = fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata) st = fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
if st != fuse.OK { if st != fuse.OK {
// XXX log - dup wrt readBlk -> common func. // XXX log - dup wrt readBlk -> common func.
...@@ -682,6 +693,8 @@ func (f *file) invalidateBlk(ctx context.Context, blk int64) error { ...@@ -682,6 +693,8 @@ func (f *file) invalidateBlk(ctx context.Context, blk int64) error {
// invalidate file/head/data[blk] in OS file cache. // invalidate file/head/data[blk] in OS file cache.
st = fsconn.FileNotify(f.Inode(), off, blksize) st = fsconn.FileNotify(f.Inode(), off, blksize)
// XXX st != ok (fatal here) // XXX st != ok (fatal here)
panic("TODO")
} }
// ---------------------------------------- // ----------------------------------------
......
...@@ -72,7 +72,7 @@ type zBlk interface { ...@@ -72,7 +72,7 @@ type zBlk interface {
// XXX the information is preserved even when ZBlk comes to ghost // XXX the information is preserved even when ZBlk comes to ghost
// state, but is lost if ZBlk is garbage collected. // state, but is lost if ZBlk is garbage collected.
// //
// XXX concurrent access // bindZFile is safe for concurrent access.
// //
// XXX link to overview. // XXX link to overview.
bindZFile(zfile *ZBigFile, blk int64) bindZFile(zfile *ZBigFile, blk int64)
...@@ -92,19 +92,20 @@ const zwendelin = "wendelin.bigfile.file_zodb" ...@@ -92,19 +92,20 @@ const zwendelin = "wendelin.bigfile.file_zodb"
// persistent state. // persistent state.
type zBlkBase struct { type zBlkBase struct {
mu sync.Mutex mu sync.Mutex
filetab map[*ZBigFile]SetI64 // {} zfile -> set(#blk) inzfile map[*ZBigFile]SetI64 // {} zfile -> set(#blk)
} }
func (zb *zBlkBase) bindZfile(zfile *ZBigFile, blk int64) { // bindZFile implements zBlk.
func (zb *zBlkBase) bindZFile(zfile *ZBigFile, blk int64) {
zb.mu.Lock() zb.mu.Lock()
defer zb.mu.Unlock() defer zb.mu.Unlock()
filemap, ok := zb.filetab[zfile] blkmap, ok := zb.inzfile[zfile]
if !ok { if !ok {
filemap = I64Set{} blkmap = make(SetI64, 1)
zb.filetab[zfile] = filemap zb.inzfile[zfile] = blkmap
} }
filemap.Add(blk) blkmap.Add(blk)
} }
// ---- ZBlk0 ---- // ---- ZBlk0 ----
......
...@@ -21,22 +21,29 @@ ...@@ -21,22 +21,29 @@
package main package main
// I64Set is a set of int64. // SetI64 is a set of int64.
type I64Set map[int64]struct{} type SetI64 map[int64]struct{}
// Add adds v to the set. // Add adds v to the set.
func (s I64Set) Add(v int64) { func (s SetI64) Add(v int64) {
s[int64] = struct{}{} s[v] = struct{}{}
} }
// Has checks whether the set contains v. // Has checks whether the set contains v.
func (s I64Set) Has(v int64) bool { func (s SetI64) Has(v int64) bool {
_, ok := s[v] _, ok := s[v]
return ok return ok
} }
// Update adds t values to s.
func (s SetI64) Update(t SetI64) {
for v := range t {
s.Add(v)
}
}
// Elements returns all elements of set as slice. // Elements returns all elements of set as slice.
func (s I64Set) Elements() []int64 { func (s SetI64) Elements() []int64 {
ev := make([]int64, len(s)) ev := make([]int64, len(s))
i := 0 i := 0
for e := range s { for e := range s {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment