Commit 7649e798 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9c90d3f4
...@@ -305,6 +305,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) { ...@@ -305,6 +305,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
} }
if δ != nil { if δ != nil {
// XXX rebuild first
δzfile[oid] = δ δzfile[oid] = δ
δftail.vδE = append(δftail.vδE, _ΔFileEpoch{ δftail.vδE = append(δftail.vδE, _ΔFileEpoch{
Rev: δZ.Tid, Rev: δZ.Tid,
...@@ -390,6 +391,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -390,6 +391,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
xtail.AssertSlice(δFtail, lo, hi) xtail.AssertSlice(δFtail, lo, hi)
// XXX locking // XXX locking
// XXX rebuild
// query .δBtail.SliceByRootRev(file.blktab, lo, hi) + // query .δBtail.SliceByRootRev(file.blktab, lo, hi) +
// merge δZBlk history with that. // merge δZBlk history with that.
...@@ -409,11 +411,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -409,11 +411,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
// δFile ────────o───────o──────x─────x──────────────────────── // δFile ────────o───────o──────x─────x────────────────────────
δftail := δFtail.byFile[zfile.POid()]
root := δftail.root // XXX take epochs into account
vδT := δFtail.δBtail.SliceByRootRev(root, lo, δFtail.Head()) // NOTE @head, not hi
vδZ := δFtail.δBtail.ΔZtail().SliceByRev(lo, hi)
var vδf []*ΔFile var vδf []*ΔFile
// vδfTail returns or creates vδf entry for revision tail // vδfTail returns or creates vδf entry for revision tail
// tail must be <= all vδf revisions // tail must be <= all vδf revisions
...@@ -433,6 +430,18 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -433,6 +430,18 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
return δfTail return δfTail
} }
δftail := δFtail.byFile[zfile.POid()]
vδE := δftail.vδE
ie := len(vδE) - 1
_ = ie
vδZ := δFtail.δBtail.ΔZtail().SliceByRev(lo, hi)
iz := len(vδZ) - 1
root := δftail.root // XXX take epochs into account
vδT := δFtail.δBtail.SliceByRootRev(root, lo, δFtail.Head()) // NOTE @head, not hi
// state of Zinblk as we are scanning ← // state of Zinblk as we are scanning ←
// initially corresponds to @head = vδT[-1] // initially corresponds to @head = vδT[-1]
...@@ -451,8 +460,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -451,8 +460,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
ZinblkAt = lo ZinblkAt = lo
} }
iz := len(vδZ) - 1 for (iz >= 0 || it >= 0) { // XXX + ie
for (iz >= 0 || it >= 0) {
// δZ that is covered by current Zinblk // δZ that is covered by current Zinblk
// -> update δf // -> update δf
if iz >= 0 { if iz >= 0 {
...@@ -531,6 +539,9 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -531,6 +539,9 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
func (δFtail *ΔFtail) LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64, at zodb.Tid) (_ zodb.Tid, exact bool) { func (δFtail *ΔFtail) LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64, at zodb.Tid) (_ zodb.Tid, exact bool) {
//defer xerr.Contextf(&err, "") // XXX text //defer xerr.Contextf(&err, "") // XXX text
//δftail := δFtail.byFile[zf.POid()]
//root := δftail.root // XXX handle epochs
err := zf.PActivate(ctx) err := zf.PActivate(ctx)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
...@@ -538,14 +549,20 @@ func (δFtail *ΔFtail) LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64, ...@@ -538,14 +549,20 @@ func (δFtail *ΔFtail) LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64,
defer zf.PDeactivate() defer zf.PDeactivate()
// XXX take epochs into account // XXX take epochs into account
epoch := zodb.Tid(0) // XXX stub
// XXX tabRev -> treeRev ? // XXX tabRev -> treeRev ?
//zblkOid, ok, tabRev, tabRevExact, err := δFtail.δBtail.GetAt(ctx, root, blk, at)
zblkOid, ok, tabRev, tabRevExact, err := δFtail.δBtail.GetAt(ctx, zf.blktab, blk, at) zblkOid, ok, tabRev, tabRevExact, err := δFtail.δBtail.GetAt(ctx, zf.blktab, blk, at)
//fmt.Printf("GetAt #%d @%s -> %s, %v, @%s, %v\n", blk, at, zblkOid, ok, tabRev, tabRevExact) //fmt.Printf("GetAt #%d @%s -> %s, %v, @%s, %v\n", blk, at, zblkOid, ok, tabRev, tabRevExact)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
} }
if tabRev < epoch {
tabRev = epoch
}
// block was removed // block was removed
// XXX or not in tracked set? // XXX or not in tracked set?
if !ok { if !ok {
......
...@@ -901,9 +901,14 @@ retry: ...@@ -901,9 +901,14 @@ retry:
file := bfdir.fileTab[foid] file := bfdir.fileTab[foid]
if δfile.Epoch { if δfile.Epoch {
wg.Go(func(ctx context.Context) error { // XXX while invalidating whole file at epoch is easy,
return file.invalidateAll() // NOTE does not accept ctx // it becomes not so easy to handle isolation if epochs
}) // could be present. For this reason we forbid changes
// to ZBigFile objects for now.
return fmt.Errorf("ZBigFile<%s> changed @%s", foid, δF.Rev)
// wg.Go(func(ctx context.Context) error {
// return file.invalidateAll() // NOTE does not accept ctx
// })
} else { } else {
for blk := range δfile.Blocks { for blk := range δfile.Blocks {
blk := blk blk := blk
...@@ -1313,7 +1318,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1313,7 +1318,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// and thus would trigger DB access again. // and thus would trigger DB access again.
// //
// TODO if direct-io: don't touch pagecache // TODO if direct-io: don't touch pagecache
// TODO upload parts only not covered by currrent read (not to e.g. wait for page lock) // TODO upload parts only not covered by current read (not to e.g. wait for page lock)
// TODO skip upload completely if read is wide to cover whole blksize // TODO skip upload completely if read is wide to cover whole blksize
go f.uploadBlk(blk, loading) go f.uploadBlk(blk, loading)
...@@ -1703,7 +1708,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1703,7 +1708,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// rlocked during pin setup. // rlocked during pin setup.
// //
// δ δ // δ δ
// ----x----.------------]----x---- // ────x────.────────────]────x────
// ↑ ↑ // ↑ ↑
// w.at head // w.at head
// //
...@@ -1722,6 +1727,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1722,6 +1727,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
δFtail := bfdir.δFtail δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
// the file if w.at is before that time:
//
// create file
// ────.────────x────────]────
// ↑ ↑
// w.at head
//
// but then the file should not be normally accessed in that case.
//
// -> reject such watches with an error
return fmt.Errorf("file epoch detected @%s in between (at,head=@%s]", δfile.Rev, headAt)
}
for blk := range δfile.Blocks { for blk := range δfile.Blocks {
_, already := toPin[blk] _, already := toPin[blk]
if already { if already {
...@@ -1736,7 +1756,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1736,7 +1756,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX adjust wcfs tests to not require only accessed // XXX adjust wcfs tests to not require only accessed
// blocks to be in setup pins? But that would mean that // blocks to be in setup pins? But that would mean that
// potentially more blocks would be potentially // potentially more blocks would be potentially
// _unneccessarily_ pinned if they are not going to be // _unnecessarily_ pinned if they are not going to be
// accessed at all. // accessed at all.
if !f.accessed.Has(blk) { if !f.accessed.Has(blk) {
continue continue
...@@ -2110,7 +2130,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) { ...@@ -2110,7 +2130,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
root.revMu.Unlock() root.revMu.Unlock()
if already { if already {
// XXX race wrt simlutaneous "FORGET @<rev>" ? // XXX race wrt simultaneous "FORGET @<rev>" ?
return revDir, nil return revDir, nil
} }
...@@ -2555,7 +2575,7 @@ func _main() (err error) { ...@@ -2555,7 +2575,7 @@ func _main() (err error) {
} }
// wait for unmount // wait for unmount
// XXX the kernel does not sentd FORGETs on unmount - release left node resources ourselves? // XXX the kernel does not send FORGETs on unmount - release left node resources ourselves?
<-serveCtx.Done() <-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl) log.Infof("stop %q %q", mntpt, zurl)
return nil // XXX serveErr | zwatchErr ? return nil // XXX serveErr | zwatchErr ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment