Commit 790bee70 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7476fdac
...@@ -122,7 +122,6 @@ ...@@ -122,7 +122,6 @@
// clients, that had requested it (separately to each client), about the // clients, that had requested it (separately to each client), about the
// changes: // changes:
// //
// XXX rev_max -> rev?
// S: 2 pin <bigfileX> #<blk> @<rev_max> XXX 2-> 2*k (multiple pins in parallel) // S: 2 pin <bigfileX> #<blk> @<rev_max> XXX 2-> 2*k (multiple pins in parallel)
// //
// and waits until all clients confirm that changed file block can be updated // and waits until all clients confirm that changed file block can be updated
...@@ -291,7 +290,7 @@ package main ...@@ -291,7 +290,7 @@ package main
// cache for @<rev>/bigfile/file[blk], where // cache for @<rev>/bigfile/file[blk], where
// //
// # see below about file.δtail // # see below about file.δtail
// # XXX -> file.δtail.LastRevOf(#blk, zhead.at) // # XXX -> file.δtail.LastBlkRev(#blk, zhead.at)
// rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at // rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at
// //
// - invalidate head/bigfile/file[blk] in OS file cache. // - invalidate head/bigfile/file[blk] in OS file cache.
...@@ -332,14 +331,14 @@ package main ...@@ -332,14 +331,14 @@ package main
// 7.1) load blkdata for head/bigfile/file[blk] @zhead.at . // 7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
// //
// while loading this also gives upper bound estimate of when the block // while loading this also gives upper bound estimate of when the block
// was last changed: // was last changed: XXX kill upper bound -> populate and use δFtail
// //
// rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk)) // rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk))
// //
// it is not exact because BTree/Bucket can change (e.g. rebalance) // it is not exact because BTree/Bucket can change (e.g. rebalance)
// but still point to the same k->ZBlk. // but still point to the same k->ZBlk.
// //
// we also use file.δtail to find either exact blk revision: XXX // we also use file.δtail to find either exact blk revision: XXX just use δFtail
// //
// rev(blk) = max(file.δtail.by(#blk) -> []rev↑) // rev(blk) = max(file.δtail.by(#blk) -> []rev↑)
// //
...@@ -401,6 +400,13 @@ package main ...@@ -401,6 +400,13 @@ package main
// //
// XXX For every ZODB connection a dedicated read-only transaction is maintained. // XXX For every ZODB connection a dedicated read-only transaction is maintained.
// XXX notation
//
// δZ - changes in ZODB space
// δB - changes in BTree*s* space
// δF - changes in File*s* space
// δfile - changes in File(1) space
import ( import (
"bufio" "bufio"
"context" "context"
...@@ -670,7 +676,6 @@ func (root *Root) zwatcher(ctx context.Context) (err error) { ...@@ -670,7 +676,6 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
traceZWatch("zevent: %s", zevent) traceZWatch("zevent: %s", zevent)
var *zodb.EventCommit
switch zevent := zevent.(type) { switch zevent := zevent.(type) {
default: default:
return fmt.Errorf("unexpected event: %T", zevent) return fmt.Errorf("unexpected event: %T", zevent)
...@@ -679,15 +684,13 @@ func (root *Root) zwatcher(ctx context.Context) (err error) { ...@@ -679,15 +684,13 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
return zevent.Err return zevent.Err
case *zodb.EventCommit: case *zodb.EventCommit:
= zevent root.handleδZ(zevent)
} }
root.zδhandle1()
} }
} }
// zδhandle1 handles 1 change event from ZODB notification. // handleδZ handles 1 change event from ZODB notification.
func (root *Root) zδhandle1( *zodb.EventCommit) { func (root *Root) handleδZ(δZ *zodb.EventCommit) {
head := root.head head := root.head
// while we are invalidating OS cache, make sure that nothing, that // while we are invalidating OS cache, make sure that nothing, that
...@@ -734,8 +737,8 @@ retry: ...@@ -734,8 +737,8 @@ retry:
//fmt.Printf("\n\n\n") //fmt.Printf("\n\n\n")
// = (tid↑, []oid) // δZ = (tid↑, []oid)
for _, oid := range .Changev { for _, oid := range δZ.Changev {
// XXX zhead.Cache() lock/unlock // XXX zhead.Cache() lock/unlock
obj := zhead.Cache().Get(oid) obj := zhead.Cache().Get(oid)
if obj == nil { if obj == nil {
...@@ -796,7 +799,7 @@ retry: ...@@ -796,7 +799,7 @@ retry:
// XXX no indexMu lock needed because head is Locked // XXX no indexMu lock needed because head is Locked
// XXX stub -> TODO full δbtree | update indexLooked itself // XXX stub -> TODO full δbtree | update indexLooked itself
//fmt.Printf("\nbtreeChangev: %v\n", btreeChangev) //fmt.Printf("\nbtreeChangev: %v\n", btreeChangev)
δf := bfdir.δFtail.Update() δf := bfdir.δFtail.Update(δZ)
//fmt.Printf("xfiles: %v\n", xfiles) //fmt.Printf("xfiles: %v\n", xfiles)
for _, δ := range δf.Changev { for _, δ := range δf.Changev {
file := δ.File file := δ.File
...@@ -847,13 +850,13 @@ retry: ...@@ -847,13 +850,13 @@ retry:
panic(err) // XXX panic(err) // XXX
} }
// resync .zhead to .tid // resync .zhead to δZ.tid
// XXX -> Head.Resync() ? // XXX -> Head.Resync() ?
// 1. abort old and resync to new txn/at // 1. abort old and resync to new txn/at
transaction.Current(zhead.txnCtx).Abort() transaction.Current(zhead.txnCtx).Abort()
_, ctx = transaction.New(context.Background()) // XXX bg ok? _, ctx = transaction.New(context.Background()) // XXX bg ok?
err = zhead.Resync(ctx, .Tid) err = zhead.Resync(ctx, δZ.Tid)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
} }
...@@ -876,7 +879,7 @@ retry: ...@@ -876,7 +879,7 @@ retry:
// notify .wcfs/zhead // notify .wcfs/zhead
for sk := range gdebug.zheadSockTab { for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(sk, "%s\n", .Tid) _, err := fmt.Fprintf(sk, "%s\n", δZ.Tid)
if err != nil { if err != nil {
log.Error(err) // XXX errctx, -> warning? log.Error(err) // XXX errctx, -> warning?
sk.Close() sk.Close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment