Commit 7540653c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 70a8eb7d
......@@ -60,7 +60,6 @@ import (
// ZBlk is the interface that every ZBlk* block implements.
type ZBlk interface {
zodb.IPersistent
_ZBlkInΔFtail
// LoadBlkData loads from database and returns data block stored by this ZBlk.
//
......@@ -79,7 +78,6 @@ var _ ZBlk = (*ZBlk1)(nil)
// ZBlk0 mimics ZBlk0 from python.
type ZBlk0 struct {
zodb.Persistent
zblkInΔFtail
// NOTE py source uses bytes(buf) but on python2 it still results in str
blkdata string
......@@ -157,7 +155,6 @@ func (zd *zDataState) PySetState(pystate interface{}) error {
// ZBlk1 mimics ZBlk1 from python.
type ZBlk1 struct {
zodb.Persistent
zblkInΔFtail
chunktab *btree.IOBTree // {} offset -> ZData(chunk)
}
......
// Code generated by gen-set ZBigFile *ZBigFile; DO NOT EDIT.
// Copyright (C) 2015-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zdata
// SetZBigFile is a set of *ZBigFile.
type SetZBigFile map[*ZBigFile]struct{}
// Add adds v to the set.
func (s SetZBigFile) Add(v *ZBigFile) {
s[v] = struct{}{}
}
// Del removes v from the set.
// it is noop if v was not in the set.
func (s SetZBigFile) Del(v *ZBigFile) {
delete(s, v)
}
// Has checks whether the set contains v.
func (s SetZBigFile) Has(v *ZBigFile) bool {
_, ok := s[v]
return ok
}
// Update adds t values to s.
func (s SetZBigFile) Update(t SetZBigFile) {
for v := range t {
s.Add(v)
}
}
// Elements returns all elements of set as slice.
func (s SetZBigFile) Elements() []*ZBigFile {
ev := make([]*ZBigFile, len(s))
i := 0
for e := range s {
ev[i] = e
i++
}
return ev
}
// Union returns s ∪ t
func (s SetZBigFile) Union(t SetZBigFile) SetZBigFile {
// l = max(len(s), len(t))
l := len(s)
if lt := len(t); lt > l {
l = lt
}
u := make(SetZBigFile, l)
for v := range s {
u.Add(v)
}
for v := range t {
u.Add(v)
}
return u
}
// Intersection returns s ∩ t
func (s SetZBigFile) Intersection(t SetZBigFile) SetZBigFile {
i := SetZBigFile{}
for v := range s {
if t.Has(v) {
i.Add(v)
}
}
return i
}
// Difference returns s\t.
func (s SetZBigFile) Difference(t SetZBigFile) SetZBigFile {
d := SetZBigFile{}
for v := range s {
if !t.Has(v) {
d.Add(v)
}
}
return d
}
// SymmetricDifference returns s Δ t.
func (s SetZBigFile) SymmetricDifference(t SetZBigFile) SetZBigFile {
d := SetZBigFile{}
for v := range s {
if !t.Has(v) {
d.Add(v)
}
}
for v := range t {
if !s.Has(v) {
d.Add(v)
}
}
return d
}
// Equal returns whether a == b.
func (a SetZBigFile) Equal(b SetZBigFile) bool {
if len(a) != len(b) {
return false
}
for v := range a {
_, ok := b[v]
if !ok {
return false
}
}
return true
}
......@@ -19,14 +19,9 @@
package zdata
// XXX kill
//go:generate ../set/gen-set zdata ZBigFile *ZBigFile zset_bigfile.go
import (
"context"
"fmt"
"runtime"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -80,9 +75,10 @@ type SetOid = set.SetOid
type ΔFtail struct {
// ΔFtail merges ΔBtail with history of ZBlk
δBtail *xbtree.ΔBtail
// fileIdx map[zodb.Oid]SetZBigFile // tree-root -> {} ZBigFile as of @head
fileIdx map[zodb.Oid]SetOid // tree-root -> {} ZBigFile<oid> as of @head
trackSetZFile SetOid // set of tracked ZBigFiles as of @head
trackSetZBlk map[zodb.Oid]*zblkTrack // zblk -> {} root -> {}blk as of @head
// XXX kill
///*
// XXX don't need vδF - everything is reconstructed at runtime from .δBtail.vδT
......@@ -93,16 +89,20 @@ type ΔFtail struct {
// tracked ZBlk that are not yet taken into account in current vδF.
// grows on new track requests; flushes on queries and update.
// trackNew map[*ZBigFile]map[zodb.Oid]*zblkInΔFtail // {} file -> {} oid -> zblk
trackNew map[zodb.Oid]map[zodb.Oid]*zblkInΔFtail // {} root -> {} oid -> zblk
trackNew map[zodb.Oid]map[zodb.Oid]*zblkTrack // {} foid -> {} zoid -> zblk
//*/
}
// zblkTrack keeps information in which root/blocks ZBlk is present as of @head.
type zblkTrack struct {
// inroot map[zodb.Oid]SetI64 // {} root -> {}blk XXX later switch to this
infile map[zodb.Oid]SetI64 // {} foid -> {}blk
}
// ΔF represents a change in files space.
type ΔF struct {
Rev zodb.Tid
// ByFile map[*ZBigFile]*ΔFile // file -> δfile
ByFile map[zodb.Oid]*ΔFile // file<oid> -> δfile
ByFile map[zodb.Oid]*ΔFile // foid -> δfile
}
// ΔFile represents a change to one file.
......@@ -112,25 +112,6 @@ type ΔFile struct {
Size bool // whether file size changed XXX -> ΔSize?
}
// zblkInΔFtail is ΔFtail-related volatile data embedded into ZBlk*.
//
// The data is preserved even when ZBlk comes to ghost state, but is lost if
// ZBlk is garbage collected. The data is transient - it is _not_ included into
// persistent state.
type zblkInΔFtail struct {
mu sync.Mutex // to support multiple concurrent loaders
// XXX change vvv to intree_parent? {} Bucket -> set(#blk)
// (this is uniform with building in-RAM reverse child->parents relation for
// tree nodes and for tree_root->file)
// with which files/blocks this ZBlk is associated with as of @head state
// infile map[*ZBigFile]SetI64 // {} file -> set(#blk)
inroot map[zodb.Oid]SetI64 // {} root -> set(#blk)
}
type _ZBlkInΔFtail interface { inΔFtail() *zblkInΔFtail }
func (z *zblkInΔFtail) inΔFtail() *zblkInΔFtail { return z }
// NewΔFtail creates new ΔFtail object.
//
......@@ -141,11 +122,11 @@ func (z *zblkInΔFtail) inΔFtail() *zblkInΔFtail { return z }
// ZODB when needed.
func NewΔFtail(at0 zodb.Tid, db *zodb.DB) *ΔFtail {
return &ΔFtail{
δBtail: xbtree.NewΔBtail(at0, db),
// fileIdx: make(map[zodb.Oid]SetZBigFile),
fileIdx: make(map[zodb.Oid]SetOid),
// trackNew: make(map[*ZBigFile]map[zodb.Oid]*zblkInΔFtail),
trackNew: make(map[zodb.Oid]map[zodb.Oid]*zblkInΔFtail),
δBtail: xbtree.NewΔBtail(at0, db),
fileIdx: map[zodb.Oid]SetOid{},
trackSetZFile: SetOid{},
trackSetZBlk: map[zodb.Oid]*zblkTrack{},
trackNew: map[zodb.Oid]map[zodb.Oid]*zblkTrack{},
}
}
......@@ -167,7 +148,9 @@ func (δFtail *ΔFtail) Tail() zodb.Tid { return δFtail.δBtail.Tail() }
//
// A root can be associated with several files (each provided on different Track call).
func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zblk ZBlk) {
fileOid := file.POid()
// XXX locking
foid := file.POid()
if blk == -1 {
// XXX blk = ∞ from beginning ?
blk = xbtree.KeyMax
......@@ -176,43 +159,44 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb
if err != nil {
panic(err) // XXX -> error? errctx
}
root := path[0].(*btree.LOBTree)
rootOid := root.POid()
files, ok := δFtail.fileIdx[rootOid]
files, ok := δFtail.fileIdx[root.POid()]
if !ok {
files = SetOid{}
δFtail.fileIdx[rootOid] = files
δFtail.fileIdx[root.POid()] = files
}
files.Add(fileOid)
files.Add(foid)
δFtail.trackSetZFile.Add(foid)
// associate zblk with file, if it was not hole
if zblk != nil {
z := zblk.inΔFtail()
z.mu.Lock()
// blocks, ok := z.infile[file]
blocks, ok := z.inroot[rootOid]
zoid := zblk.POid()
zt, ok := δFtail.trackSetZBlk[zoid]
if !ok {
zt = &zblkTrack{}
δFtail.trackSetZBlk[zoid] = zt
}
blocks, ok := zt.infile[foid]
if !ok {
blocks = make(SetI64, 1)
// if z.infile == nil {
// z.infile = make(map[*ZBigFile]SetI64)
if z.inroot == nil {
z.inroot = make(map[zodb.Oid]SetI64)
if zt.infile == nil {
zt.infile = make(map[zodb.Oid]SetI64)
}
// z.infile[file] = blocks
z.inroot[rootOid] = blocks
zt.infile[foid] = blocks
}
blocks.Add(blk)
z.mu.Unlock()
// XXX locking
if !ok {
// zblk was not associated with this file
zt := δFtail.trackNew[fileOid]
if zt == nil {
zt = make(map[zodb.Oid]*zblkInΔFtail, 1)
δFtail.trackNew[fileOid] = zt
ft := δFtail.trackNew[foid]
if ft == nil {
ft = make(map[zodb.Oid]*zblkTrack, 1)
δFtail.trackNew[foid] = ft
}
zt[zblk.POid()] = z
ft[zoid] = zt
}
}
......@@ -241,6 +225,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *xzodb.ZConn) (_ ΔF
// XXX δFtail.update() first?
// XXX verify zhead.At() == δFtail.Head()
// XXX locking
δB, err := δFtail.δBtail.Update(δZ)
if err != nil {
......@@ -277,42 +262,29 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *xzodb.ZConn) (_ ΔF
// take zblk changes into account
for _, oid := range δZ.Changev {
// XXX cache lock/unlock
obj := zhead.Cache().Get(oid)
if obj == nil {
//fmt.Printf("%s: not in cache\n", oid)
continue // nothing to do - see invariant
if δFtail.trackSetZFile.Has(oid) {
// TODO check that .blksize and .blktab (it is only
// persistent reference) do not change.
return ΔF{}, fmt.Errorf("ZBigFile<%s> changed @%s", oid, δZ.Tid)
}
//fmt.Printf("%s: in cache (%s)\n", oid, typeOf(obj))
switch obj := obj.(type) {
case ZBlk: // ZBlk*
// z.infile locking: since we write-locked head.zheadMu
// - no other fuse reads are running, and thus no one
// is mutating z.infile. XXX recheck
z := obj.inΔFtail()
for file, blocks := range z.infile {
δfile, ok := δF.ByFile[file]
if !ok {
δfile = &ΔFile{Rev: δF.Rev, Blocks: make(SetI64)}
δF.ByFile[file] = δfile
}
zt, ok := δFtail.trackSetZBlk[oid]
if !ok {
continue // not tracked
}
δfile.Blocks.Update(blocks)
for foid, blocks := range zt.infile {
δfile, ok := δF.ByFile[foid]
if !ok {
δfile = &ΔFile{Rev: δF.Rev, Blocks: make(SetI64)}
δF.ByFile[foid] = δfile
}
// XXX update z.infile according to btree changes
case *ZBigFile:
// TODO check that .blksize and .blktab (it is only
// persistent reference) do not change.
return ΔF{}, fmt.Errorf("ZBigFile<%s> changed @%s", oid, δZ.Tid)
δfile.Blocks.Update(blocks)
}
// make sure obj won't be garbage-collected until we finish handling it.
runtime.KeepAlive(obj)
// XXX update zt.infile according to btree changes
}
δFtail.vδF = append(δFtail.vδF, δF)
......@@ -329,18 +301,17 @@ func (δFtail *ΔFtail) update(file *ZBigFile) {
panic("TODO")
}
fileOid := file.POid()
// let's see if we need to rebuild .vδF due to not-yet processed track requests
foid := file.POid()
// XXX locking
// XXX dumb
zt, dirty := δFtail.trackNew[fileOid]
zt, dirty := δFtail.trackNew[foid]
if !dirty {
return
}
delete(δFtail.trackNew, fileOid)
delete(δFtail.trackNew, foid)
// XXX unlock here
for i, δZ := range δFtail.δBtail.ΔZtail().Data() {
......@@ -354,14 +325,14 @@ func (δFtail *ΔFtail) update(file *ZBigFile) {
}
// XXX locking
// XXX -> func δF.δfile(file) ?
δfile, ok := δF.ByFile[fileOid]
// XXX -> func δF.δfile(foid) ?
δfile, ok := δF.ByFile[foid]
if !ok {
δfile = &ΔFile{Rev: δF.Rev, Blocks: make(SetI64)}
δF.ByFile[fileOid] = δfile
δF.ByFile[foid] = δfile
}
δfile.Blocks.Update(z.infile[fileOid])
δfile.Blocks.Update(z.infile[foid])
}
}
}
......@@ -392,8 +363,6 @@ func (δFtail *ΔFtail) SliceByFileRev(file *ZBigFile, lo, hi zodb.Tid) /*readon
// FIXME rework to just query .δBtail.SliceByRootRev(file.blktab, lo, hi) +
// merge δZBlk history with that.
foid := file.POid()
// XXX locking?
δFtail.update(file)
......@@ -419,6 +388,7 @@ func (δFtail *ΔFtail) SliceByFileRev(file *ZBigFile, lo, hi zodb.Tid) /*readon
vδF = vδF[i:j+1]
// filter found changed to have only file-related bits
foid := file.POid()
var vδfile []*ΔFile
for _, δF := range vδF {
δfile, ok := δF.ByFile[foid]
......
......@@ -257,21 +257,19 @@ package main
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
//
// #blk ∈ OS file cache => ZBlk(#blk) + all BTree/Bucket that lead to it ∈ zhead live cache(%)
// (ZBlk* in ghost state)
// #blk ∈ OS file cache => all BTree/Bucket/ZBlk that lead to blk are tracked(%)
//
// => all BTree/Bucket that lead to blk are tracked (XXX)
// The invariant helps on invalidation: when δFtail (see below) sees a
// changed oid, it is guaranteed that if the change affects block that was
// ever provided to OS, δFtail will detect that this block has changed. XXX review
// And if oid relates to a file block but is not in δFtail's tracking set -
// we know that block is not cached and will trigger ZODB load on a future
// file read.
//
// The invariant helps on invalidation: if we see a changed oid, and
// zhead.cache.lookup(oid) = ø -> we know we don't have to invalidate OS
// cache for any part of any file (even if oid relates to a file block - that
// block is not cached and will trigger ZODB load on file read).
//
// XXX explain why tracked
//
// Currently we maintain this invariant by simply never evicting ZBlk/LOBTree/LOBucket
// objects from ZODB Connection cache. In the future we may want to try to
// synchronize to kernel freeing its pagecache pages.
// Currently we maintain this invariant by adding ZBlk/LOBTree/LOBucket
// objects to δFtail on every access, and never shrinking that tracking set.
// In the future we may want to try to synchronize to kernel freeing its
// pagecache pages.
//
// 4) when we receive an invalidation message from ZODB - we process it and
// propagate invalidations to OS file cache of head/bigfile/*:
......@@ -301,6 +299,8 @@ package main
// Eager invalidation would require full scan - Trees _and_
// Buckets, which makes it prohibitively expensive - see (+).
//
// FIXME all ^^^ is outdated -> XXX δFtail
//
// 4.4) for all file/blk to invalidate we do:
//
// - try to retrieve head/bigfile/file[blk] from OS file cache(*);
......@@ -718,7 +718,7 @@ type blkPinState struct {
err error
}
// -------- 3) Cache invariant --------
// -------- ZODB cache control --------
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
......@@ -726,34 +726,28 @@ type blkPinState struct {
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// See "3) for */head/data the following invariant is maintained..."
type zodbCacheControl struct {}
func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
switch obj.(type) {
// ZBlk* should be in cache but without data
// don't let ZBlk*/ZData to pollute the cache
case *ZBlk0:
return zodb.PCachePinObject | zodb.PCacheDropState
return zodb.PCacheDropObject | zodb.PCacheDropState
case *ZBlk1:
return zodb.PCachePinObject | zodb.PCacheDropState
// ZBigFile btree index should be in cache with data
case *btree.LOBTree:
return zodb.PCachePinObject | zodb.PCacheKeepState
case *btree.LOBucket:
return zodb.PCachePinObject | zodb.PCacheKeepState
// don't let ZData to pollute the cache
return zodb.PCacheDropObject | zodb.PCacheDropState
case *ZData:
return zodb.PCacheDropObject | zodb.PCacheDropState
// for performance reason we also keep ZBigFile in cache.
// keep ZBigFile and its btree index in cache to speedup file data access.
//
// ZBigFile is top-level object that is used on every block load, and
// it would be a waste to evict ZBigFile from cache.
case *ZBigFile:
return zodb.PCachePinObject | zodb.PCacheKeepState
case *btree.LOBTree:
return zodb.PCachePinObject | zodb.PCacheKeepState
case *btree.LOBucket:
return zodb.PCachePinObject | zodb.PCacheKeepState
}
return 0
......@@ -882,7 +876,7 @@ retry:
if log.V(2) {
// debug dump δF
log.Infof("\n\nS: handleδZ: δF (#%d):\n", len(δF.ByFile))
for zfileOid, δfile := range δF.ByFile {
for foid, δfile := range δF.ByFile {
blkv := δfile.Blocks.Elements()
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
......@@ -891,19 +885,19 @@ retry:
if δfile.Size {
size = "S"
}
log.Infof("S: \t- %s\t%s %v\n", zfileOid, size, blkv)
log.Infof("S: \t- %s\t%s %v\n", foid, size, blkv)
}
log.Infof("\n\n")
}
wg := xsync.NewWorkGroup(ctx)
for zfileOid, δfile := range δF.ByFile {
for foid, δfile := range δF.ByFile {
// // XXX needed?
// // XXX even though δBtail is complete, not all ZBlk are present here
// file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
// zfile was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[zfileOid]
// file was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[foid]
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
......@@ -921,11 +915,11 @@ retry:
//
// do it after completing data invalidations.
wg = xsync.NewWorkGroup(ctx)
for zfile, δfile := range δF.ByFile {
for foid, δfile := range δF.ByFile {
if !δfile.Size {
continue
}
file := bfdir.fileTab[zfile.POid()] // must be present
file := bfdir.fileTab[foid] // must be present
wg.Go(func(ctx context.Context) error {
return file.invalidateAttr() // NOTE does not accept ctx
})
......@@ -950,14 +944,17 @@ retry:
// 2. restat invalidated ZBigFile
// NOTE no lock needed since .blksize and .size are constant during lifetime of one txn.
// XXX -> parallel
for zfile := range δF.ByFile {
for foid := range δF.ByFile {
file := bfdir.fileTab[foid] // must be present
zfile := file.zfile
size, sizePath, err := zfile.Size(ctx)
if err != nil {
return err
}
file := bfdir.fileTab[zfile.POid()] // must be present
file.size = size
// see "3) for */head/data the following invariant is maintained..."
bfdir.δFtail.Track(zfile, -1, sizePath, nil)
// XXX we can miss a change to file if δblk is not yet tracked
......@@ -1501,6 +1498,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// update δFtail index XXX -> move upper into readBlk ?
// (δFtail is just for δZ -> δF invalidation handling and is needed without isolation protocol)
// XXX ^^^ no - also need to query to send pins
// see "3) for */head/data the following invariant is maintained..."
bfdir := f.head.bfdir
δFtail := bfdir.δFtail
bfdir.δFmu.Lock() // XXX locking correct? XXX -> better push down?
......@@ -2204,6 +2202,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
// only head/ needs δFtail, f.δtail and watches.
if head.rev == 0 {
// see "3) for */head/data the following invariant is maintained..."
head.bfdir.δFmu.Lock() // XXX locking ok?
head.bfdir.δFtail.Track(f.zfile, -1, sizePath, nil)
head.bfdir.δFmu.Unlock()
......@@ -2391,8 +2390,8 @@ func _main() (err error) {
zhead, err := xzodb.ZOpen(ctx, zdb, &zodb.ConnOptions{
At: at0,
// we need zhead.cache to be maintained across several transactions.
// see "3) for head/bigfile/* the following invariant is maintained ..."
// preserve zhead.cache across several transactions.
// see "ZODB cache control"
NoPool: true,
})
if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment