Commit 748b40e8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ef12f1da
// Code generated by gen-set Object interface{}; DO NOT EDIT.
// Copyright (C) 2015-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package δbtree
// SetObject is a set of interface{}.
type SetObject map[interface{}]struct{}
// Add adds v to the set.
func (s SetObject) Add(v interface{}) {
s[v] = struct{}{}
}
// Has checks whether the set contains v.
func (s SetObject) Has(v interface{}) bool {
_, ok := s[v]
return ok
}
// Update adds t values to s.
func (s SetObject) Update(t SetObject) {
for v := range t {
s.Add(v)
}
}
// Elements returns all elements of set as slice.
func (s SetObject) Elements() []interface{} {
ev := make([]interface{}, len(s))
i := 0
for e := range s {
ev[i] = e
i++
}
return ev
}
......@@ -21,6 +21,7 @@
package δbtree
//go:generate ../../gen-set δbtree Tree *Tree zset_tree.go
//go:generate ../../gen-set δbtree Object interface{} zset_object.go
import (
"fmt"
......@@ -39,17 +40,29 @@ type Bucket = btree.LOBucket
//
// PathSet is not safe for concurrent access.
type PathSet struct {
// residency index which maps BTree|Bucket to top tree element.
// residency index: BTree|Bucket to top tree element.
residencyIdx map[zodb.Oid]SetTree // oid -> {} roots
// master index: tree root -> {} masters
masterIdx map[*Tree]SetObject
}
// NewPathSet creates new empty PathSet.
func NewPathSet() *PathSet {
return &PathSet{
residencyIdx: make(map[zodb.Oid]SetTree),
masterIdx: make(map[*Tree]SetObject),
}
}
// Add adds path to collection.
// Add adds path to collection and associates path root with master.
//
// path[0] signifies a root.
// All path elemens must be Tree except last one which must be Bucket.
// A root can be associated with several masters (each provided on different Add call)
//
// XXX catch cycles on add?
func (m *PathSet) Add(path []zodb.IPersistent) {
func (m *PathSet) Add(master interface{}, path []zodb.IPersistent) {
l := len(path)
if l == 0 {
panic("empty path")
......@@ -81,17 +94,24 @@ func (m *PathSet) Add(path []zodb.IPersistent) {
}
nodeRoots.Add(root)
}
masterSet, ok := m.masterIdx[root]
if !ok {
masterSet = SetObject{}
m.masterIdx[root] = masterSet
}
masterSet.Add(master)
}
// Invalidates returns which trees are invalidated by changed objects.
// Invalidates returns which masters are invalidated by changed objects.
//
// returned are: roots of changed trees.
//
// XXX stub.
//
// TODO toposort changev wrt nodes ordering (defined by traversed paths) and do δbtree top-down.
func (m *PathSet) Invalidates(changev []zodb.Oid) SetTree {
toinvalidate := make(SetTree)
func (m *PathSet) Invalidates(changev []zodb.Oid) (masters SetObject) {
masters = SetObject{}
for _, oid := range changev {
roots, ok := m.residencyIdx[oid]
......@@ -99,8 +119,10 @@ func (m *PathSet) Invalidates(changev []zodb.Oid) SetTree {
continue // we don't know this oid
}
toinvalidate.Update(roots)
for root := range roots {
masters.Update(m.masterIdx[root])
}
}
return toinvalidate
return masters
}
......@@ -511,3 +511,21 @@ func (u *zrevTabUnregister) AfterCompletion(txn transaction.Transaction) {
}
}
*/
// tidmax returns latest revision.
func tidmax(a, b zodb.Tid) zodb.Tid {
if a > b {
return a
} else {
return b
}
}
// tidmin returns earliest revision.
func tidmin(a, b zodb.Tid) zodb.Tid {
if a < b {
return a
} else {
return b
}
}
......@@ -431,6 +431,7 @@ type Root struct {
// /(head|<rev>)/ - served by Head.
type Head struct {
nodefs.Node
rev zodb.Tid // 0 for head/, !0 for @<rev>/
bfdir *BigFileDir // bigfile/
// at - served by .readAt
......@@ -457,11 +458,13 @@ type BigFileDir struct {
head *Head // parent head/ or @<rev>/
// {} oid -> <bigfileX>
mu sync.Mutex
fileMu sync.Mutex
fileTab map[zodb.Oid]*BigFile
// visited BTree nodes -> which file + ordering for toposort on δbtree
blktabLoaded *δbtree.PathSet
// visited BTree nodes of all BigFiles
// -> which file + ordering for toposort on δbtree
indexMu sync.Mutex
indexLooked *δbtree.PathSet // XXX naming
}
// /(head|<rev>)/bigfile/<bigfileX> - served by BigFile.
......@@ -599,7 +602,13 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
zhead := root.head.zconn
bfdir := root.head.bfdir
toinvalidate := map[*BigFile]SetI64{} // {} file -> set(#blk)
// fileInvalidate describes invalidations for one file
type fileInvalidate struct {
blkmap SetI64 // changed blocks
size bool // whether to invalidate file sise
}
toinvalidate := map[*BigFile]*fileInvalidate{} // {} file -> set(#blk), sizeChanged
btreeChangev := []zodb.Oid{} // oids changing BTree|Bucket
fmt.Printf("\n\n\n")
......@@ -619,10 +628,10 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
continue // object not related to any bigfile
case *btree.LOBTree:
// XXX -> δBTree
btreeChangev = append(btreeChangev, obj.POid())
case *btree.LOBucket:
// XXX -> δBTree
btreeChangev = append(btreeChangev, obj.POid())
case zBlk: // ZBlk*
// blkBoundTo locking: no other bindZFile are running,
......@@ -640,12 +649,12 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
continue
}
blkmap, ok := toinvalidate[file]
finv, ok := toinvalidate[file]
if !ok {
blkmap = SetI64{}
toinvalidate[file] = blkmap
finv = &fileInvalidate{blkmap: SetI64{}}
toinvalidate[file] = finv
}
blkmap.Update(objBlk)
finv.blkmap.Update(objBlk)
}
case *ZBigFile:
......@@ -659,15 +668,31 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
runtime.KeepAlive(obj)
}
// find out which files need to be invalidated due to index change
// XXX no indexMu lock needed because head is Locked
// XXX stub -> TODO full δbtree
fmt.Printf("\nbtreeChangev: %v\n", btreeChangev)
xfiles := bfdir.indexLooked.Invalidates(btreeChangev)
fmt.Printf("xfiles: %v\n", xfiles)
for xfile := range xfiles {
file := xfile.(*BigFile)
finv, ok := toinvalidate[file]
if !ok {
finv = &fileInvalidate{} // XXX init blkmap?
toinvalidate[file] = finv
}
finv.size = true
}
fmt.Printf("\n\nzδhandle: toinvalidate (#%d):\n", len(toinvalidate))
for file := range toinvalidate {
fmt.Printf("\t- %s\n", file.zbf.POid())
}
wg, ctx := errgroup.WithContext(context.TODO())
for file, blkmap := range toinvalidate {
for file, finv := range toinvalidate {
file := file
for blk := range blkmap {
for blk := range finv.blkmap {
blk := blk
wg.Go(func() error {
return file.invalidateBlk(ctx, blk)
......@@ -675,11 +700,13 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
}
// invalidate kernel cache for attributes
// XXX we need to do it only if we see topoligy (i.e. btree) change
// we need to do it only if we see topoligy (i.e. btree) change
if finv.size {
wg.Go(func() error {
return file.invalidateAttr()
})
}
}
err := wg.Wait()
if err != nil {
panic(err) // XXX
......@@ -688,12 +715,11 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
// resync .zhead to zevent.tid
// XXX -> Head.Resync() ?
// 1. deactivate all ZBigFile (we keep them activated during whole txn)
// XXX dir.mu locking (not needed bcause zconnMu locked)
for _, file := range bfdir.fileTab {
// 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn)
// XXX dir.fileMu locking (not needed bcause zconnMu locked)
for file := range toinvalidate {
file.zbf.PDeactivate()
// XXX we need to reread size only for files in toinvalidate
file.zbfSize = -1 // just in case
file.zbfSize = -1
}
// 2. abort old and resync to new txn/at
......@@ -702,10 +728,10 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
zhead.Resync(txn, zevent.Tid)
zhead.txnCtx = ctx
// 3. reactivate/restat all ZBigFile
// 3. reactivate/restat invalidated ZBigFile
// XXX -> parallel?
// XXX locking
for _, file := range bfdir.fileTab {
for file := range toinvalidate {
err := file.zbf.PActivate(ctx)
if err != nil {
panic(err) // XXX
......@@ -859,9 +885,9 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
}()
// check to see if dir(oid) is already there
bfdir.mu.Lock()
bfdir.fileMu.Lock()
f, already := bfdir.fileTab[oid]
bfdir.mu.Unlock()
bfdir.fileMu.Unlock()
if already {
return f, nil
......@@ -874,19 +900,19 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
}
// relock bfdir and either register f or, if the file was maybe
// simultanously created while we were not holding bfdir.mu, return that.
bfdir.mu.Lock()
// simultanously created while we were not holding bfdir.fileMu, return that.
bfdir.fileMu.Lock()
f2, already := bfdir.fileTab[oid]
if already {
bfdir.mu.Unlock()
bfdir.fileMu.Unlock()
f.Close()
return f2, nil
}
bfdir.fileTab[oid] = f
bfdir.mu.Unlock()
bfdir.fileMu.Unlock()
// mkfile takes filesystem treeLock - do it outside bfdir.mu
// mkfile takes filesystem treeLock - do it outside bfdir.fileMu
mkfile(bfdir, name, f)
return f, nil
......@@ -958,6 +984,7 @@ func (root *Root) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err e
Node: newDefaultNode(),
head: revDir,
fileTab: make(map[zodb.Oid]*BigFile),
// indexLooked = nil for @revX/
}
revDir.bfdir = bfdir
......@@ -1156,19 +1183,19 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// noone was loading - we became reponsible to load this block
zbf := f.zbf
blkdata, treepath, blkpathRevMax, err := zbf.LoadBlk(ctx, blk)
blkdata, treepath, pathRevMax, err := zbf.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
close(loading.ready)
// XXX locking
x.blktabVisited.Add(treepath)
bfdir := f.head.bfdir
bfdir.indexMu.Lock() // XXX locking correct?
bfdir.indexLooked.Add(f, treepath) // XXX needed only for head/
bfdir.indexMu.Unlock()
// XXX before loading.ready?
blkrevmax, _ := f.δFtail.LastRevOf(blk, zbf.PJar().At())
if blkpathRevMax < blkrevmax {
blkrevmax = blkpathRevMax
}
blkrevmax = tidmin(blkrevmax, pathRevMax)
/*
// XXX remmapping - only if head.rev == 0
......@@ -1346,6 +1373,7 @@ func main() {
Node: newDefaultNode(),
head: head,
fileTab: make(map[zodb.Oid]*BigFile),
indexLooked: δbtree.NewPathSet(),
}
head.bfdir = bfdir
......
......@@ -61,7 +61,9 @@ type zBlk interface {
//
// If returned data size is less than the block size of containing ZBigFile,
// the block trailing is assumed to be trailing \0.
loadBlkData(ctx context.Context) ([]byte, error)
//
// returns data and revision of ZBlk.
loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error)
// bindZFile associates ZBlk as being used by zfile to store block #blk.
//
......@@ -155,16 +157,16 @@ func (zb *zBlk0State) PySetState(pystate interface{}) error {
return nil
}
func (zb *ZBlk0) loadBlkData(ctx context.Context) ([]byte, error) {
func (zb *ZBlk0) loadBlkData(ctx context.Context) ([]byte, zodb.Tid, error) {
// XXX err ctx
err := zb.PActivate(ctx)
if err != nil {
return nil, err
return nil, 0, err
}
defer zb.PDeactivate()
return mem.Bytes(zb.blkdata), nil
return mem.Bytes(zb.blkdata), zb.PSerial(), nil
}
// ---- ZBlk1 ---
......@@ -227,12 +229,12 @@ func (zb *zBlk1State) PySetState(pystate interface{}) error {
return nil
}
func (zb *ZBlk1) loadBlkData(ctx context.Context) ([]byte, error) {
func (zb *ZBlk1) loadBlkData(ctx context.Context) ([]byte, zodb.Tid, error) {
// XXX errctx
err := zb.PActivate(ctx)
if err != nil {
return nil, err
return nil, 0, err
}
defer zb.PDeactivate()
......@@ -337,12 +339,12 @@ func (zb *ZBlk1) loadBlkData(ctx context.Context) ([]byte, error) {
err = wg.Wait()
if err != nil {
return nil, err // XXX err ctx
return nil, 0, err // XXX err ctx
}
// empty .chunktab -> ø
if len(chunktab) == 0 {
return nil, nil
return nil, 0, nil
}
// glue all chunks from chunktab
......@@ -372,16 +374,16 @@ func (zb *ZBlk1) loadBlkData(ctx context.Context) ([]byte, error) {
for _, start := range offv[:len(offv)-1] {
chunk := chunktab[start]
if !(start >= stop) { // verify chunks don't overlap
return nil, fmt.Errorf("!(start >= stop)") // XXX
return nil, 0, fmt.Errorf("!(start >= stop)") // XXX
}
if !(start + int32(len(chunk.data)) <= int32(len(blkdata))) { // XXX overflow?
return nil, fmt.Errorf("blkdata overrun") // XXX
return nil, 0, fmt.Errorf("blkdata overrun") // XXX
}
stop = start + int32(len(chunk.data)) // XXX overflow?
copy(blkdata[start:], chunk.data)
}
return blkdata, nil
return blkdata, zb.PSerial(), nil
}
......@@ -443,7 +445,7 @@ func (bf *zBigFileState) PySetState(pystate interface{}) (err error) {
// it also returns:
//
// - BTree path in .blktab for loaded block,
// - maximum revision of nodes in the BTree path.
// - max(_.serial for _ in ZBlk(#blk), all BTree/Bucket that lead to ZBlk)
//
// XXX better load into user-provided buf? mem.Buf?
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []zodb.IPersistent, pathRevMax zodb.Tid, err error) {
......@@ -458,10 +460,7 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
pathRevMax = 0
xzblk, ok, err := bf.blktab.GetTo(ctx, blk, func(node zodb.IPersistent) {
treePath = append(treePath, node)
rev := node.PSerial()
if rev > pathRevMax {
pathRevMax = rev
}
pathRevMax = tidmax(pathRevMax, node.PSerial())
})
if err != nil {
return nil, nil, 0, err
......@@ -475,10 +474,11 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
return nil, nil, 0, fmt.Errorf("expect ZBlk*; got %s", typeOf(xzblk))
}
blkdata, err := zblk.loadBlkData(ctx)
blkdata, blkrev, err := zblk.loadBlkData(ctx)
if err != nil {
return nil, nil, 0, err
}
pathRevMax = tidmax(pathRevMax, blkrev)
l := int64(len(blkdata))
if l > bf.blksize {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment