Commit cb778a7d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c04a8580
...@@ -424,7 +424,7 @@ import ( ...@@ -424,7 +424,7 @@ import (
"fmt" "fmt"
stdlog "log" stdlog "log"
"os" "os"
"runtime" // "runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
...@@ -746,10 +746,11 @@ retry: ...@@ -746,10 +746,11 @@ retry:
size bool // whether to invalidate file size size bool // whether to invalidate file size
} }
toinvalidate := map[*BigFile]*fileInvalidate{} // {} file -> set(#blk), sizeChanged toinvalidate := map[*BigFile]*fileInvalidate{} // {} file -> set(#blk), sizeChanged
btreeChangev := []zodb.Oid{} // oids changing BTree|Bucket // btreeChangev := []zodb.Oid{} // oids changing BTree|Bucket
//fmt.Printf("\n\n\n") //fmt.Printf("\n\n\n")
/*
// δZ = (tid↑, []oid) // δZ = (tid↑, []oid)
for _, oid := range δZ.Changev { for _, oid := range δZ.Changev {
// XXX zhead.Cache() lock/unlock // XXX zhead.Cache() lock/unlock
...@@ -807,10 +808,10 @@ retry: ...@@ -807,10 +808,10 @@ retry:
// make sure obj won't be garbage-collected until we finish handling it. // make sure obj won't be garbage-collected until we finish handling it.
runtime.KeepAlive(obj) runtime.KeepAlive(obj)
} }
*/
// find out which files need to be invalidated due to index change // find out which files need to be invalidated due to index change
// XXX no indexMu lock needed because head is Locked // XXX no indexMu lock needed because head is Locked
// XXX stub -> TODO full δbtree | update indexLooked itself
//fmt.Printf("\nbtreeChangev: %v\n", btreeChangev) //fmt.Printf("\nbtreeChangev: %v\n", btreeChangev)
δF := bfdir.δFtail.Update(δZ) δF := bfdir.δFtail.Update(δZ)
//fmt.Printf("xfiles: %v\n", xfiles) //fmt.Printf("xfiles: %v\n", xfiles)
...@@ -1114,7 +1115,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1114,7 +1115,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
} }
// noone was loading - we became responsible to load this block // noone was loading - we became responsible to load this block
blkdata, treepath, blkrevMax, err := f.zfile.LoadBlk(ctx, blk) blkdata, treepath, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata loading.blkdata = blkdata
loading.err = err loading.err = err
...@@ -1129,7 +1130,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1129,7 +1130,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// we have the data - it can be used after watchers are updated // we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see updateWatcher comments) // XXX should we use ctx here? (see updateWatcher comments)
f.updateWatchers(ctx, blk, treepath, blkrevMax) f.updateWatchers(ctx, blk, treepath, zblk, blkrevMax)
// data can be used now // data can be used now
close(loading.ready) close(loading.ready)
...@@ -1166,7 +1167,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1166,7 +1167,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// XXX do we really need to use/propagate caller contex here? ideally update // XXX do we really need to use/propagate caller contex here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout. // watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure? // Should a READ interrupt cause watch update failure?
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, blkrevMax zodb.Tid) { func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk zBlk, blkrevMax zodb.Tid) {
// only head/ is being watched for // only head/ is being watched for
if f.head.rev != 0 { if f.head.rev != 0 {
return return
...@@ -1178,6 +1179,11 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre ...@@ -1178,6 +1179,11 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
bfdir.δFtail.Track(f, treepath) // XXX pass in zblk.oid / zblk.rev here? bfdir.δFtail.Track(f, treepath) // XXX pass in zblk.oid / zblk.rev here?
bfdir.δFmu.Unlock() bfdir.δFmu.Unlock()
// associate zblk with file, if data was not hole
if zblk != nil {
zblk.bindFile(f, blk)
}
// makes sure that file[blk] on clients side stays as of @w.at state. // makes sure that file[blk] on clients side stays as of @w.at state.
// try to use blkrevMax only as the first cheap criteria to skip updating watchers. // try to use blkrevMax only as the first cheap criteria to skip updating watchers.
......
...@@ -67,7 +67,7 @@ type zBlk interface { ...@@ -67,7 +67,7 @@ type zBlk interface {
// returns data and revision of ZBlk. // returns data and revision of ZBlk.
loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error) loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error)
// bindZFile associates ZBlk as being used by zfile to store block #blk. // bindFile associates ZBlk as being used by file to store block #blk.
// //
// A ZBlk may be bound to several blocks inside one file, and to // A ZBlk may be bound to several blocks inside one file, and to
// several files. // several files.
...@@ -75,21 +75,21 @@ type zBlk interface { ...@@ -75,21 +75,21 @@ type zBlk interface {
// The information is preserved even when ZBlk comes to ghost // The information is preserved even when ZBlk comes to ghost
// state, but is lost if ZBlk is garbage collected. // state, but is lost if ZBlk is garbage collected.
// //
// it is safe to call multiple bindZFile simultaneously. // it is safe to call multiple bindFile simultaneously.
// it is not safe to call bindZFile and boundTo simultaneously. // it is not safe to call bindFile and boundTo simultaneously.
// //
// XXX link to overview. // XXX link to overview.
bindZFile(zfile *ZBigFile, blk int64) bindFile(file *BigFile, blk int64)
// XXX unbindZFile // XXX unbindFile
// XXX zfile -> bind map for it // XXX zfile -> bind map for it
// blkBoundTo returns ZBlk association with zfile(s)/#blk(s). // blkBoundTo returns ZBlk association with file(s)/#blk(s).
// //
// The association returned is that was previously set by bindZFile. // The association returned is that was previously set by bindFile.
// //
// blkBoundTo must not be called simultaneously wrt bindZFile. // blkBoundTo must not be called simultaneously wrt bindFile.
blkBoundTo() map[*ZBigFile]SetI64 blkBoundTo() map[*BigFile]SetI64
} }
// ---- zBlkBase ---- // ---- zBlkBase ----
...@@ -448,12 +448,12 @@ func (bf *zBigFileState) PySetState(pystate interface{}) (err error) { ...@@ -448,12 +448,12 @@ func (bf *zBigFileState) PySetState(pystate interface{}) (err error) {
// which provides a rough upper-bound estimate for file[blk] revision. // which provides a rough upper-bound estimate for file[blk] revision.
// //
// XXX load into user-provided buf. // XXX load into user-provided buf.
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []btree.LONode, blkRevMax zodb.Tid, err error) { func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []btree.LONode, zblk zBlk, blkRevMax zodb.Tid, err error) {
defer xerr.Contextf(&err, "bigfile %s: loadblk %d", bf.POid(), blk) defer xerr.Contextf(&err, "bigfile %s: loadblk %d", bf.POid(), blk)
err = bf.PActivate(ctx) err = bf.PActivate(ctx)
if err != nil { if err != nil {
return nil, nil, 0, err return nil, nil, nil, 0, err
} }
defer bf.PDeactivate() defer bf.PDeactivate()
...@@ -463,26 +463,26 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath ...@@ -463,26 +463,26 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
blkRevMax = tidmax(blkRevMax, node.PSerial()) blkRevMax = tidmax(blkRevMax, node.PSerial())
}) })
if err != nil { if err != nil {
return nil, nil, 0, err return nil, nil, nil, 0, err
} }
if !ok { if !ok {
return make([]byte, bf.blksize), treePath, blkRevMax, nil return make([]byte, bf.blksize), treePath, nil, blkRevMax, nil
} }
zblk, ok := xzblk.(zBlk) zblk, ok = xzblk.(zBlk)
if !ok { if !ok {
return nil, nil, 0, fmt.Errorf("expect ZBlk*; got %s", typeOf(xzblk)) return nil, nil, nil, 0, fmt.Errorf("expect ZBlk*; got %s", typeOf(xzblk))
} }
blkdata, zblkrev, err := zblk.loadBlkData(ctx) blkdata, zblkrev, err := zblk.loadBlkData(ctx)
if err != nil { if err != nil {
return nil, nil, 0, err return nil, nil, nil, 0, err
} }
blkRevMax = tidmax(blkRevMax, zblkrev) blkRevMax = tidmax(blkRevMax, zblkrev)
l := int64(len(blkdata)) l := int64(len(blkdata))
if l > bf.blksize { if l > bf.blksize {
return nil, nil, 0, fmt.Errorf("zblk %s: invalid blk: size = %d (> blksize = %d)", zblk.POid(), l, bf.blksize) return nil, nil, nil, 0, fmt.Errorf("zblk %s: invalid blk: size = %d (> blksize = %d)", zblk.POid(), l, bf.blksize)
} }
// append trailing \0 to data to reach .blksize // append trailing \0 to data to reach .blksize
...@@ -492,10 +492,8 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath ...@@ -492,10 +492,8 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
blkdata = d blkdata = d
} }
zblk.bindZFile(bf, blk)
//log.Printf("ZBigFile.loadblk(%d) -> %dB", blk, len(blkdata)) //log.Printf("ZBigFile.loadblk(%d) -> %dB", blk, len(blkdata))
return blkdata, treePath, blkRevMax, nil return blkdata, treePath, zblk, blkRevMax, nil
} }
// Size returns whole file size. // Size returns whole file size.
......
...@@ -21,6 +21,7 @@ package main ...@@ -21,6 +21,7 @@ package main
import ( import (
"context" "context"
"runtime"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/btree" "lab.nexedi.com/kirr/neo/go/zodb/btree"
...@@ -128,6 +129,8 @@ func (δFtail *ΔFtail) Track(file *BigFile, path []btree.LONode) { ...@@ -128,6 +129,8 @@ func (δFtail *ΔFtail) Track(file *BigFile, path []btree.LONode) {
func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) ΔF { func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) ΔF {
δB := δFtail.δBtail.Update(δZ) δB := δFtail.δBtail.Update(δZ)
δF := ΔF{Rev: δB.Rev, Change: make(map[*BigFile]SetI64)} δF := ΔF{Rev: δB.Rev, Change: make(map[*BigFile]SetI64)}
// take btree changes into account
for root, δt := range δB.Change { for root, δt := range δB.Change {
files := δFtail.fileIdx[root] files := δFtail.fileIdx[root]
if len(files) == 0 { if len(files) == 0 {
...@@ -147,6 +150,51 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) ΔF { ...@@ -147,6 +150,51 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) ΔF {
} }
} }
// take zblk changes into account
for _, oid := range δZ.Changev {
// XXX cache lock/unlock
obj := zcache.Get(oid)
if obj == nil {
//fmt.Printf("%s: not in cache\n", oid)
continue // nothing to do - see invariant
}
//fmt.Printf("%s: in cache (%s)\n", oid, typeOf(obj))
switch obj := obj.(type) {
default:
continue // object not related to any bigfile
case zBlk: // ZBlk*
// blkBoundTo locking: no other bindFile are running,
// since we write-locked head.zconnMu and bindFile is
// run when loading objects - thus when head.zconnMu is
// read-locked. XXX comment -> proper place?
//
// bfdir locking: similarly not needed, since we are
// exclusively holding head lock.
for file, objBlk := range obj.blkBoundTo() {
δfile, ok := δF.Change[file]
if !ok {
δfile = make(SetI64)
δF.Change[file] = δfile
}
δfile.Update(objBlk)
}
case *ZBigFile:
// XXX check that .blksize and .blktab (it is only
// persistent reference) do not change.
// XXX shutdown fs with ^^^ message.
panic("ZBigFile changed")
}
// make sure obj won't be garbage-collected until we finish handling it.
runtime.KeepAlive(obj)
}
return δF return δF
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment