Commit 7233b4c0 authored by Kirill Smelkov's avatar Kirill Smelkov

zodb/go: In-RAM client cache

The cache is needed so that we can provide IStorage.Prefetch
functionality generally wrapped on top of a storage driver: when an
object is loaded, the loading itself consists of steps:

1. start loading object into cache,
2. wait for the loading to complete.

This way Prefetch is naturally only "1" - start loading object into
cache but do not wait for the loading to be complete. Go's goroutines
naturally help here where we can spawn every such loading into its own
goroutine instead of explicitly programming loading in terms of a state
machine.

Since this cache is mainly needed for Prefetch to work, not to actually
cache data (though it works as cache for repeating access too), the goal
when writing it was to add minimal overhead for "data-not-yet-in-cache"
case. Current state we are not completely there yet but the latency is
acceptable - depending on the workload the cache layer adds ~

	0.5 - 1 - 3µs

to loading times.
parent dfd4fb73
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
// cache management
import (
"context"
"fmt"
"sort"
"sync"
"unsafe"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xcontainer/list"
)
// XXX managing LRU under 1 big gcMu might be bad for scalability.
// TODO maintain nhit / nmiss + way to read cache stats
// TODO optimize cache more so that miss overhead becomes negligible
// Cache provides RAM caching layer that can be used over a storage.
type Cache struct {
loader interface {
StorLoader
URL() string
}
mu sync.RWMutex
// cache is fully synchronized with storage for transactions with tid <= head.
// XXX clarify ^^^ (it means if revCacheEntry.head=∞ it is Cache.head)
head Tid
entryMap map[Oid]*oidCacheEntry // oid -> oid's cache entries
gcMu sync.Mutex
lru lruHead // revCacheEntries in LRU order
size int // cached data size in bytes
sizeMax int // cache is allowed to occupy not more than this
}
// oidCacheEntry maintains cached revisions for 1 oid
type oidCacheEntry struct {
oid Oid
sync.Mutex
// cached revisions in ascending order
// [i].serial <= [i].head < [i+1].serial <= [i+1].head
//
// NOTE ^^^ .serial = 0 while loading is in progress
// NOTE ^^^ .serial = 0 if .err != nil
rcev []*revCacheEntry
}
// revCacheEntry is information about 1 cached oid revision
type revCacheEntry struct {
parent *oidCacheEntry // oidCacheEntry holding us
inLRU lruHead // in Cache.lru; protected by Cache.gcMu
// we know that load(oid, .head) will give this .serial:oid.
//
// this is only what we currently know - not necessarily covering
// whole correct range - e.g. if oid revisions in db are 1 and 5 if we
// query db with load(@3) on return we'll get serial=1 and
// remember .head as 3. But for load(@4) we have to redo
// database query again.
//
// if .head=∞ here, that actually means head is cache.head
// ( this way we do not need to bump .head to next tid in many
// unchanged cache entries when a transaction invalidation comes )
//
// .head can be > cache.head and still finite - that represents a
// case when load with tid > cache.head was called.
head Tid
// loading result: object (buf, serial) or error
buf *mem.Buf
serial Tid
err error
// done when loading finished
// (like closed-when-ready `chan struct{}` but does not allocate on
// make and is faster)
ready sync.WaitGroup
// protected by .parent's lock:
accounted bool // whether rce size was accounted in cache size
// how many waiters for buf is there while rce is being loaded.
// after data for this RCE is loaded loadRCE will do .buf.XIncref() .waitBufRef times.
// = -1 after loading is complete.
waitBufRef int32
}
// StorLoader represents loading part of a storage.
// XXX -> zodb.IStorageLoader (or zodb.Loader ?) ?
type StorLoader interface {
Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error)
}
// lock order: Cache.mu > oidCacheEntry
// Cache.gcMu > oidCacheEntry
// NewCache creates new cache backed up by loader.
//
// The cache will use not more than ~ sizeMax bytes of RAM for cached data.
func NewCache(loader interface { StorLoader; URL() string }, sizeMax int) *Cache {
c := &Cache{
loader: loader,
entryMap: make(map[Oid]*oidCacheEntry),
sizeMax: sizeMax,
}
c.lru.Init()
return c
}
// SetSizeMax adjusts how much RAM cache can use for cached data.
func (c *Cache) SetSizeMax(sizeMax int) {
c.gcMu.Lock()
c.sizeMax = sizeMax
if c.size > c.sizeMax {
c.gc()
}
c.gcMu.Unlock()
}
// Load loads data from database via cache.
//
// If data is already in cache - cached content is returned.
func (c *Cache) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
rce, rceNew := c.lookupRCE(xid, +1)
// rce is already in cache - use it
if !rceNew {
rce.ready.Wait()
c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru.Head)
c.gcMu.Unlock()
// rce is not in cache - this goroutine becomes responsible for loading it
} else {
c.loadRCE(ctx, rce)
}
if rce.err != nil {
return nil, 0, &OpError{URL: c.loader.URL(), Op: "load", Args: xid, Err: rce.err}
}
return rce.buf, rce.serial, nil
}
// Prefetch arranges for data to be eventually present in cache.
//
// If data is not yet in cache loading for it is started in the background.
// Prefetch is not blocking operation and does not wait for loading, if any was
// started, to complete.
//
// Prefetch does not return any error.
func (c *Cache) Prefetch(ctx context.Context, xid Xid) {
rce, rceNew := c.lookupRCE(xid, +0)
// !rceNew -> no need to adjust LRU - it will be adjusted by further actual data Load.
// More: we must not expose not-yet-loaded RCEs to Cache.lru because
// their rce.waitBufRef was not yet synced to rce.buf.
// See loadRCE for details.
// spawn loading in the background if rce was not yet loaded
if rceNew {
go c.loadRCE(ctx, rce)
}
}
// lookupRCE returns revCacheEntry corresponding to xid.
//
// rceNew indicates whether rce is new and so loading on it has not been
// initiated yet. If so the caller should proceed to loading rce via loadRCE.
//
// wantBufRef indicates how much caller wants returned rce.buf to be incref'ed.
//
// This increment will be done only after rce is loaded either by lookupRCE
// here - if it find the rce to be already loaded, or by future loadRCE - if
// rce is not loaded yet - to which the increment will be scheduled.
//
// In any way - either by lookupRCE or loadRCE - the increment will be done
// consistently while under rce.parent lock - this way making sure concurrent gc
// won't release rce.buf while it does not yet hold all its wanted references.
func (c *Cache) lookupRCE(xid Xid, wantBufRef int) (rce *revCacheEntry, rceNew bool) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.head read consistently
c.mu.RLock()
oce := c.entryMap[xid.Oid]
cacheHead := c.head
if oce != nil {
oce.Lock()
c.mu.RUnlock()
} else {
// relock cache in write mode to create oce
c.mu.RUnlock()
c.mu.Lock()
oce = c.entryMap[xid.Oid]
if oce == nil {
oce = oceAlloc(xid.Oid)
c.entryMap[xid.Oid] = oce
}
cacheHead = c.head // reload c.head because we relocked the cache
oce.Lock()
c.mu.Unlock()
}
// oce, at -> rce (revCacheEntry)
l := len(oce.rcev)
i := sort.Search(l, func(i int) bool {
head_i := oce.rcev[i].head
if head_i == TidMax {
head_i = cacheHead
}
return xid.At <= head_i
})
switch {
// not found - at > max(rcev.head) - insert new max entry
case i == l:
rce = oce.newRevEntry(i, xid.At)
if rce.head == cacheHead {
// FIXME better do this when the entry becomes loaded ?
// XXX vs concurrent invalidations?
rce.head = TidMax
}
rceNew = true
// found:
// at <= rcev[i].head
// at > rcev[i-1].head
// exact match - we already have entry for this at
case xid.At == oce.rcev[i].head:
rce = oce.rcev[i]
// non-exact match:
// - same entry if q(at) ∈ [serial, head]
// - we can also reuse this entry if q(at) <= head and err="nodata"
case oce.rcev[i].loaded() && (
(oce.rcev[i].err == nil && oce.rcev[i].serial <= xid.At) ||
(isErrNoData(oce.rcev[i].err) && xid.At <= oce.rcev[i].head)):
rce = oce.rcev[i]
// otherwise - insert new entry
default:
rce = oce.newRevEntry(i, xid.At)
rceNew = true
}
// wantBufRef -> either incref loaded buf, or schedule this incref to
// loadRCE to be done after loading is complete.
for ; wantBufRef > 0; wantBufRef-- {
if rce.loaded() {
rce.buf.XIncref()
} else {
rce.waitBufRef++
}
}
oce.Unlock()
return rce, rceNew
}
// loadRCE performs data loading from database into rce.
//
// rce must be new just created by lookupRCE() with returned rceNew=true.
// loading completion is signalled by marking rce.ready done.
func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry) {
oce := rce.parent
buf, serial, err := c.loader.Load(ctx, Xid{At: rce.head, Oid: oce.oid})
// normalize buf/serial if it was error
if err != nil {
e := err.(*OpError) // XXX better driver return *OpError explicitly
// only remember problem cause - full OpError will be
// reconstructed in Load with actual requested there xid.
// XXX check .Op == "load" ?
err = e.Err
// TODO err == canceled? -> don't remember
buf.XRelease()
buf = nil
serial = 0
}
rce.serial = serial
rce.buf = buf
rce.err = err
// verify db gives serial <= head
if rce.serial > rce.head {
rce.markAsDBError("load(@%v) -> %v", rce.head, serial)
}
δsize := rce.buf.Len()
oce.Lock()
// sync .waitBufRef -> .buf
//
// this is needed so that we always put into Cache.lru an RCE with proper
// refcount = 1 for cache + n·Load waiters. If we do not account for
// n·Load waiters here - under .parent's lock, gc might run before Load
// resumes, see .buf.refcnt = 1 and return .buf to freelist -> oops.
for ; rce.waitBufRef > 0; rce.waitBufRef-- {
rce.buf.XIncref()
}
rce.waitBufRef = -1 // mark as loaded
i := oce.find(rce)
if i == -1 {
// rce was already dropped by merge / evicted
// (XXX recheck about evicted)
oce.Unlock()
rce.ready.Done()
return
}
// merge rce with adjacent entries in parent
// ( e.g. load(@3) and load(@4) results in the same data loaded if
// there are only revisions with serials 1 and 5 )
//
// if rce & rceNext cover the same range -> drop rce
//
// if we drop rce - do not update c.lru as:
// 1. new rce is not on lru list,
// 2. rceNext (which becomes rce) might not be there on lru list.
//
// if rceNext is not yet there on lru list its loadRCE is in progress
// and will update lru and cache size for it itself.
rceOrig := rce
rceDropped := false
if i+1 < len(oce.rcev) {
rceNext := oce.rcev[i+1]
if rceNext.loaded() && tryMerge(rce, rceNext, rce) {
// not δsize -= len(rce.buf.Data)
// tryMerge can change rce.buf if consistency is broken
δsize = 0
rce.buf.XRelease()
rce = rceNext
rceDropped = true
}
}
// if rcePrev & rce cover the same range -> drop rcePrev
// (if we drop rcePrev we'll later remove it from c.lru when under c.gcMu)
var rcePrevDropped *revCacheEntry
if i > 0 {
rcePrev := oce.rcev[i-1]
if rcePrev.loaded() && tryMerge(rcePrev, rce, rce) {
rcePrevDropped = rcePrev
if rcePrev.accounted {
δsize -= rcePrev.buf.Len()
}
rcePrev.buf.XRelease()
}
}
if !rceDropped {
rce.accounted = true
}
oce.Unlock()
// now after .waitBufRef was synced to .buf notify to waiters that
// original rce in question was loaded. Do so outside .parent lock.
rceOrig.ready.Done()
// update lru & cache size
c.gcMu.Lock()
if rcePrevDropped != nil {
rcePrevDropped.inLRU.Delete()
}
if !rceDropped {
rce.inLRU.MoveBefore(&c.lru.Head)
}
c.size += δsize
if c.size > c.sizeMax {
c.gc()
}
c.gcMu.Unlock()
}
// tryMerge tries to merge rce prev into next
//
// both prev and next must be already loaded.
// prev and next must come adjacent to each other in parent.rcev with
// prev.head < next.head .
//
// cur must be one of either prev or next and indicates which rce is current
// and so may be adjusted with consistency check error.
//
// return: true if merging done and thus prev was dropped from parent
//
// must be called with .parent locked
func tryMerge(prev, next, cur *revCacheEntry) bool {
// can merge if consistent if
// (if merging)
//
// Pok Nok Ns <= Ph Ps = Ns
// Pe Nok Ns <= Ph Pe != "nodata" (e.g. it was IO loading error for P)
// Pok Ne ---
// Ne Pe (Pe="nodata") && (Ne="nodata") -> XXX vs deleteObject?
// -> let deleted object actually read
// -> as special non-error value
//
// h - head
// s - serial
// e - error
if next.err == nil && next.serial <= prev.head {
// drop prev
prev.parent.del(prev)
// check consistency
switch {
case prev.err == nil && prev.serial != next.serial:
cur.markAsDBError("load(@%v) -> %v; load(@%v) -> %v",
prev.head, prev.serial, next.head, next.serial)
case prev.err != nil && !isErrNoData(prev.err):
if cur.err == nil {
cur.markAsDBError("load(@%v) -> %v; load(@%v) -> %v",
prev.head, prev.err, next.head, next.serial)
}
}
return true
}
if isErrNoData(prev.err) && isErrNoData(next.err) {
// drop prev
prev.parent.del(prev)
// not checking consistency - error is already there and
// (Pe="nodata") && (Ne="nodata") already indicates prev & next are consistent.
return true
}
return false
}
// ---- garbage collection ----
// gc performs garbage-collection.
//
// must be called with .gcMu locked.
func (c *Cache) gc() {
//fmt.Printf("\n> gc\n")
//defer fmt.Printf("< gc\n")
for {
if c.size <= c.sizeMax {
return
}
// kill 1 least-used rce
h := c.lru.Next()
if h == &c.lru {
panic("cache: gc: empty .lru but .size > .sizeMax")
}
rce := h.rceFromInLRU()
oce := rce.parent
oceFree := false // whether to GC whole rce.parent OCE cache entry
oce.Lock()
i := oce.find(rce)
if i != -1 { // rce could be already deleted by e.g. merge
oce.deli(i)
if len(oce.rcev) == 0 {
oceFree = true
}
c.size -= rce.buf.Len()
//fmt.Printf("gc: free %d bytes\n", rce.buf.Len()))
// not-yet-loaded rce must not be on Cache.lru
if !rce.loaded() {
panic("cache: gc: found !loaded rce on lru")
}
rce.buf.XRelease()
}
oce.Unlock()
h.Delete()
if oceFree {
c.mu.Lock()
oce.Lock()
// recheck once again oce is still not used
// (it could be looked up again in the meantime we were not holding its lock)
if len(oce.rcev) == 0 {
delete(c.entryMap, oce.oid)
} else {
oceFree = false
}
oce.Unlock()
c.mu.Unlock()
if oceFree {
oce.release()
}
}
}
}
// freelist(OCE)
var ocePool = sync.Pool{New: func() interface{} { return &oidCacheEntry{} }}
// oceAlloc allocates oidCacheEntry from freelist.
func oceAlloc(oid Oid) *oidCacheEntry {
oce := ocePool.Get().(*oidCacheEntry)
oce.oid = oid
return oce
}
// release puts oce back into freelist.
//
// Oce must be empty and caller must not use oce after call to release.
func (oce *oidCacheEntry) release() {
if len(oce.rcev) != 0 {
panic("oce.release: .rcev != []")
}
oce.oid = 0 // just in case
ocePool.Put(oce)
}
// ----------------------------------------
// isErrNoData returns whether an error is due to "there is no such data in
// database", not e.g. some IO loading error
func isErrNoData(err error) bool {
switch err.(type) {
default:
return false
case *NoObjectError:
case *NoDataError:
}
return true
}
// newRevEntry creates new revCacheEntry with .head and inserts it into .rcev @i.
// (if i == len(oce.rcev) - entry is appended)
//
// oce must be locked.
func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry {
rce := &revCacheEntry{
parent: oce,
head: head,
}
rce.ready.Add(1)
rce.inLRU.Init() // initially not on Cache.lru list
oce.rcev = append(oce.rcev, nil)
copy(oce.rcev[i+1:], oce.rcev[i:])
oce.rcev[i] = rce
return rce
}
// find finds rce in .rcev and returns its index
// not found -> -1.
//
// oce must be locked.
func (oce *oidCacheEntry) find(rce *revCacheEntry) int {
for i, r := range oce.rcev {
if r == rce {
return i
}
}
return -1
}
// deli deletes .rcev[i]
//
// oce must be locked.
func (oce *oidCacheEntry) deli(i int) {
n := len(oce.rcev) - 1
copy(oce.rcev[i:], oce.rcev[i+1:])
// release ptr to revCacheEntry so it won't confusingly stay live when
// its turn to be deleted come.
oce.rcev[n] = nil
oce.rcev = oce.rcev[:n]
}
// del deletes rce from .rcev.
// it panics if rce is not there.
//
// oce must be locked.
func (oce *oidCacheEntry) del(rce *revCacheEntry) {
i := oce.find(rce)
if i == -1 {
panic("rce not found")
}
oce.deli(i)
}
// loaded reports whether rce was already loaded
//
// must be called with rce.parent locked.
func (rce *revCacheEntry) loaded() bool {
return (rce.waitBufRef == -1)
}
// list head that knows it is in revCacheEntry.inLRU
type lruHead struct {
list.Head
}
// XXX vvv strictly speaking -unsafe.Offsetof(h.Head)
func (h *lruHead) Next() *lruHead { return (*lruHead)(unsafe.Pointer(h.Head.Next())) }
func (h *lruHead) Prev() *lruHead { return (*lruHead)(unsafe.Pointer(h.Head.Prev())) }
// revCacheEntry: .inLRU -> .
func (h *lruHead) rceFromInLRU() (rce *revCacheEntry) {
urce := unsafe.Pointer(uintptr(unsafe.Pointer(h)) - unsafe.Offsetof(rce.inLRU))
return (*revCacheEntry)(urce)
}
// errDB returns error about database being inconsistent
func errDB(oid Oid, format string, argv ...interface{}) error {
// XXX -> separate type?
return fmt.Errorf("cache: database inconsistency: oid: %v: "+format,
append([]interface{}{oid}, argv...)...)
}
// markAsDBError marks rce with database inconsistency error.
//
// Caller must be the only one to access rce.
// In practice this means rce was just loaded but neither yet signalled to be
// ready to waiter, nor yet made visible to GC (via adding to Cache.lru list).
func (rce *revCacheEntry) markAsDBError(format string, argv ...interface{}) {
rce.err = errDB(rce.parent.oid, format, argv...)
rce.buf.XRelease()
rce.buf = nil
rce.serial = 0
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"runtime"
"sort"
"sync/atomic"
"testing"
"lab.nexedi.com/kirr/go123/mem"
"github.com/kylelemons/godebug/pretty"
)
// tStorage implements read-only storage for cache testing
type tStorage struct {
// oid -> [](.serial↑, .data)
dataMap map[Oid][]tOidData
}
// data for oid for 1 revision
type tOidData struct {
serial Tid
data []byte
err error // e.g. io error
}
// create new buffer with specified content copied there.
func mkbuf(data []byte) *mem.Buf {
buf := mem.BufAlloc(len(data))
copy(buf.Data, data)
return buf
}
// check whether buffers hold same data or both are nil.
//
// NOTE we ignore refcnt here
func bufSame(buf1, buf2 *mem.Buf) bool {
if buf1 == nil {
return (buf2 == nil)
}
return reflect.DeepEqual(buf1.Data, buf2.Data)
}
func (stor *tStorage) URL() string {
return "test"
}
func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
//fmt.Printf("> load(%v)\n", xid)
//defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }()
buf, serial, err = stor.load(xid)
if err != nil {
err = &OpError{URL: stor.URL(), Op: "load", Args: xid, Err: err}
}
return buf, serial, err
}
func (stor *tStorage) load(xid Xid) (buf *mem.Buf, serial Tid, err error) {
datav := stor.dataMap[xid.Oid]
if datav == nil {
return nil, 0, &NoObjectError{xid.Oid}
}
// find max entry with .serial <= xid.At
n := len(datav)
i := n - 1 - sort.Search(n, func(i int) bool {
v := datav[n - 1 - i].serial <= xid.At
//fmt.Printf("@%d -> %v (@%d; %v)\n", i, v, n - 1 -i, xid.At)
return v
})
//fmt.Printf("i: %d n: %d\n", i, n)
if i == -1 {
// xid.At < all .serial - no such transaction
return nil, 0, &NoDataError{Oid: xid.Oid, DeletedAt: 0}
}
s, e := datav[i].serial, datav[i].err
b := mkbuf(datav[i].data)
if e != nil {
b, s = nil, 0 // obey protocol of returning nil, 0 with error
}
return b, s, e
}
var ioerr = errors.New("input/output error")
func xidat(oid Oid, tid Tid) Xid {
return Xid{Oid: oid, At: tid}
}
func nodata(oid Oid, deletedAt Tid) *NoDataError {
return &NoDataError{Oid: oid, DeletedAt: deletedAt}
}
func TestCache(t *testing.T) {
// XXX hack; place=ok?
pretty.CompareConfig.PrintStringers = true
debug := pretty.DefaultConfig
debug.IncludeUnexported = true
__ := Checker{t}
ok1 := func(v bool) { t.Helper(); __.ok1(v) }
hello := []byte("hello")
world := []byte("world!!")
zz := []byte("zz")
www := []byte("www")
big := []byte("0123456789")
tstor := &tStorage{
dataMap: map[Oid][]tOidData{
1: {
{4, hello, nil},
{7, nil, ioerr},
{10, world, nil},
{16, zz, nil},
{20, www, nil},
{77, big, nil},
},
},
}
b := mkbuf
c := NewCache(tstor, 100 /* > Σ all data */)
ctx := context.Background()
checkLoad := func(xid Xid, buf *mem.Buf, serial Tid, errCause error) {
t.Helper()
bad := &bytes.Buffer{}
b, s, e := c.Load(ctx, xid)
if !bufSame(buf, b) {
fmt.Fprintf(bad, "buf:\n%s\n", pretty.Compare(buf, b))
}
if serial != s {
fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, s))
}
var err error
if errCause != nil {
err = &OpError{URL: "test", Op: "load", Args: xid, Err: errCause}
}
if !reflect.DeepEqual(err, e) {
fmt.Fprintf(bad, "err:\n%s\n", pretty.Compare(err, e))
}
if bad.Len() != 0 {
t.Fatalf("load(%v):\n%s", xid, bad.Bytes())
}
}
checkOIDV := func(oidvOk ...Oid) {
t.Helper()
var oidv []Oid
for oid := range c.entryMap {
oidv = append(oidv, oid)
}
sort.Slice(oidv, func(i, j int) bool {
return oidv[i] < oidv[j]
})
if !reflect.DeepEqual(oidv, oidvOk) {
t.Fatalf("oidv: %s", pretty.Compare(oidvOk, oidv))
}
}
checkRCE := func(rce *revCacheEntry, head, serial Tid, buf *mem.Buf, err error) {
t.Helper()
bad := &bytes.Buffer{}
if rce.head != head {
fmt.Fprintf(bad, "head:\n%s\n", pretty.Compare(head, rce.head))
}
if rce.serial != serial {
fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, rce.serial))
}
if !bufSame(rce.buf, buf) {
fmt.Fprintf(bad, "buf:\n%s\n", pretty.Compare(buf, rce.buf))
}
if !reflect.DeepEqual(rce.err, err) {
fmt.Fprintf(bad, "err:\n%s\n", pretty.Compare(err, rce.err))
}
if bad.Len() != 0 {
t.Fatalf("rce:\n%s", bad.Bytes()) // XXX add oid?
}
}
checkOCE := func(oid Oid, rcev ...*revCacheEntry) {
t.Helper()
oce, ok := c.entryMap[oid]
if !ok {
t.Fatalf("oce(%v): not present in cache", oid)
}
oceRcev := oce.rcev
if len(oceRcev) == 0 {
oceRcev = nil // nil != []{}
}
if !reflect.DeepEqual(oceRcev, rcev) {
t.Fatalf("oce(%v):\n%s\n", oid, pretty.Compare(rcev, oceRcev))
}
}
checkMRU := func(sizeOk int, mruvOk ...*revCacheEntry) {
t.Helper()
size := 0
var mruv []*revCacheEntry
for hp, h := &c.lru, c.lru.Prev(); h != &c.lru; hp, h = h, h.Prev() {
//xv := []interface{}{&c.lru, h.rceFromInLRU()}
//debug.Print(xv) // &c.lru, h.rceFromInLRU())
if h.Next() != hp {
t.Fatalf("LRU list .next/.prev broken for\nh:\n%s\n\nhp:\n%s\n",
debug.Sprint(h), debug.Sprint(hp))
}
rce := h.rceFromInLRU()
size += rce.buf.Len()
mruv = append(mruv, rce)
}
if !reflect.DeepEqual(mruv, mruvOk) {
t.Fatalf("MRU:\n%s\n", pretty.Compare(mruv, mruvOk))
}
if size != sizeOk {
t.Fatalf("cache: size(all-rce-in-lru): %d ; want: %d", size, sizeOk)
}
if size != c.size {
t.Fatalf("cache: size(all-rce-in-lru): %d ; c.size: %d", size, c.size)
}
}
// ---- verify cache behaviour for must be loaded/merged entries ----
// (this exercises mostly loadRCE/tryMerge)
checkOIDV()
checkMRU(0)
// load @2 -> new rce entry
checkLoad(xidat(1,2), nil, 0, nodata(1,0))
checkOIDV(1)
oce1 := c.entryMap[1]
ok1(len(oce1.rcev) == 1)
rce1_h2 := oce1.rcev[0]
checkRCE(rce1_h2, 2, 0, nil, nodata(1,0))
checkMRU(0, rce1_h2)
// load @3 -> 2] merged with 3]
checkLoad(xidat(1,3), nil, 0, nodata(1,0))
checkOIDV(1)
ok1(len(oce1.rcev) == 1)
rce1_h3 := oce1.rcev[0]
ok1(rce1_h3 != rce1_h2) // rce1_h2 was merged into rce1_h3
checkRCE(rce1_h3, 3, 0, nil, nodata(1,0))
checkMRU(0, rce1_h3)
// load @1 -> 1] merged with 3]
checkLoad(xidat(1,1), nil, 0, nodata(1,0))
checkOIDV(1)
ok1(len(oce1.rcev) == 1)
ok1(oce1.rcev[0] == rce1_h3)
checkRCE(rce1_h3, 3, 0, nil, nodata(1,0))
checkMRU(0, rce1_h3)
// load @5 -> new rce entry with data
checkLoad(xidat(1,5), b(hello), 4, nil)
checkOIDV(1)
ok1(len(oce1.rcev) == 2)
rce1_h5 := oce1.rcev[1]
checkRCE(rce1_h5, 5, 4, b(hello), nil)
checkOCE(1, rce1_h3, rce1_h5)
checkMRU(5, rce1_h5, rce1_h3)
// load @4 -> 4] merged with 5]
checkLoad(xidat(1,4), b(hello), 4, nil)
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h5)
checkMRU(5, rce1_h5, rce1_h3)
// load @6 -> 5] merged with 6]
checkLoad(xidat(1,6), b(hello), 4, nil)
checkOIDV(1)
ok1(len(oce1.rcev) == 2)
rce1_h6 := oce1.rcev[1]
ok1(rce1_h6 != rce1_h5)
checkRCE(rce1_h6, 6, 4, b(hello), nil)
checkOCE(1, rce1_h3, rce1_h6)
checkMRU(5, rce1_h6, rce1_h3)
// load @7 -> ioerr + new rce
checkLoad(xidat(1,7), nil, 0, ioerr)
checkOIDV(1)
ok1(len(oce1.rcev) == 3)
rce1_h7 := oce1.rcev[2]
checkRCE(rce1_h7, 7, 0, nil, ioerr)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7)
checkMRU(5, rce1_h7, rce1_h6, rce1_h3)
// load @9 -> ioerr + new rce (IO errors are not merged)
checkLoad(xidat(1,9), nil, 0, ioerr)
checkOIDV(1)
ok1(len(oce1.rcev) == 4)
rce1_h9 := oce1.rcev[3]
checkRCE(rce1_h9, 9, 0, nil, ioerr)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9)
checkMRU(5, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// load @10 -> new data rce, not merged with ioerr at 9]
checkLoad(xidat(1,10), b(world), 10, nil)
checkOIDV(1)
ok1(len(oce1.rcev) == 5)
rce1_h10 := oce1.rcev[4]
checkRCE(rce1_h10, 10, 10, b(world), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h10)
checkMRU(12, rce1_h10, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// load @11 -> 10] merged with 11]
checkLoad(xidat(1,11), b(world), 10, nil)
checkOIDV(1)
ok1(len(oce1.rcev) == 5)
rce1_h11 := oce1.rcev[4]
ok1(rce1_h11 != rce1_h10)
checkRCE(rce1_h11, 11, 10, b(world), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11)
checkMRU(12, rce1_h11, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// simulate case where 13] (α) and 15] (β) were loaded in parallel, both are ready
// but 13] (α) takes oce lock first before 15] and so 11] is not yet merged
// with 15] -> 11] and 13] should be merged into 15].
// (manually add rce1_h15 so it is not merged with 11])
rce1_h15, new15 := c.lookupRCE(xidat(1,15), +0)
ok1(new15)
rce1_h15.serial = 10
rce1_h15.buf = mkbuf(world)
// here: first half of loadRCE(15]) before close(15].ready)
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h15)
ok1(!rce1_h15.loaded())
checkMRU(12, rce1_h11, rce1_h9, rce1_h7, rce1_h6, rce1_h3) // no 15] yet
// (lookup 13] while 15] is not yet loaded so 15] is not picked
// automatically at lookup phase)
rce1_h13, new13 := c.lookupRCE(xidat(1,13), +0)
ok1(new13)
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h13, rce1_h15)
checkMRU(12, rce1_h11, rce1_h9, rce1_h7, rce1_h6, rce1_h3) // no <14 and <16 yet
// (now 15] becomes ready but does not yet takes oce lock)
rce1_h15.waitBufRef = -1
rce1_h15.ready.Done()
ok1(rce1_h15.loaded())
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h13, rce1_h15)
checkMRU(12, rce1_h11, rce1_h9, rce1_h7, rce1_h6, rce1_h3) // no 13] and 15] yet
// (13] also becomes ready and takes oce lock first, merging 11] and 13] into 15].
// 15] did not yet took oce lock so c.size is temporarily reduced and
// 15] is not yet on LRU list)
c.loadRCE(ctx, rce1_h13)
checkOIDV(1)
checkRCE(rce1_h13, 13, 10, b(world), nil)
checkRCE(rce1_h15, 15, 10, b(world), nil)
checkRCE(rce1_h11, 11, 10, b(world), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15)
checkMRU(5 /*was 12*/, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// (15] takes oce lock and updates c.size and LRU list)
rce1_h15.ready.Add(1) // so loadRCE could run
c.loadRCE(ctx, rce1_h15)
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15)
checkMRU(12, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// similar race in between 16] and 17] but now β (17]) takes oce lock first:
rce1_h16, new16 := c.lookupRCE(xidat(1,16), +0)
ok1(new16)
rce1_h17, new17 := c.lookupRCE(xidat(1,17), +0)
ok1(new17)
// (16] loads but not yet takes oce lock)
rce1_h16.serial = 16
rce1_h16.buf = mkbuf(zz)
rce1_h16.waitBufRef = -1
rce1_h16.ready.Done()
ok1(rce1_h16.loaded())
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h16, rce1_h17)
checkMRU(12, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3) // no 16] and 17] yet
// (17] loads and takes oce lock first - merge 16] with 17])
c.loadRCE(ctx, rce1_h17)
checkOIDV(1)
checkRCE(rce1_h17, 17, 16, b(zz), nil)
checkRCE(rce1_h16, 16, 16, b(zz), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h17)
checkMRU(14, rce1_h17, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// load @19 -> 17] merged with 19]
checkLoad(xidat(1,19), b(zz), 16, nil)
ok1(len(oce1.rcev) == 6)
rce1_h19 := oce1.rcev[5]
ok1(rce1_h19 != rce1_h17)
checkOIDV(1)
checkRCE(rce1_h19, 19, 16, b(zz), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h19)
checkMRU(14, rce1_h19, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// load @20 -> new 20]
checkLoad(xidat(1,20), b(www), 20, nil)
ok1(len(oce1.rcev) == 7)
rce1_h20 := oce1.rcev[6]
checkOIDV(1)
checkRCE(rce1_h20, 20, 20, b(www), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h19, rce1_h20)
checkMRU(17, rce1_h20, rce1_h19, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// load @21 -> 20] merged with 21]
checkLoad(xidat(1,21), b(www), 20, nil)
ok1(len(oce1.rcev) == 7)
rce1_h21 := oce1.rcev[6]
ok1(rce1_h21 != rce1_h20)
checkOIDV(1)
checkRCE(rce1_h21, 21, 20, b(www), nil)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h19, rce1_h21)
checkMRU(17, rce1_h21, rce1_h19, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
// ---- verify rce lookup for must be cached entries ----
// (this exercises lookupRCE)
checkLookup := func(xid Xid, expect *revCacheEntry) {
t.Helper()
bad := &bytes.Buffer{}
rce, rceNew := c.lookupRCE(xid, +0)
if rceNew {
fmt.Fprintf(bad, "rce must be already in cache\n")
}
if rce != expect {
fmt.Fprintf(bad, "unexpected rce found:\n%s\n", pretty.Compare(expect, rce))
}
if bad.Len() != 0 {
t.Fatalf("lookupRCE(%v):\n%s", xid, bad.Bytes())
}
}
checkLookup(xidat(1,19), rce1_h19)
checkLookup(xidat(1,18), rce1_h19)
checkLookup(xidat(1,17), rce1_h19)
checkLookup(xidat(1,16), rce1_h19)
checkLookup(xidat(1,15), rce1_h15)
checkLookup(xidat(1,14), rce1_h15)
checkLookup(xidat(1,11), rce1_h15)
checkLookup(xidat(1,10), rce1_h15)
checkLookup(xidat(1,9), rce1_h9)
// 8] must be separate from 7] and 9] because it is IO error there
rce1_h8, new8 := c.lookupRCE(xidat(1,8), +0)
ok1(new8)
c.loadRCE(ctx, rce1_h8)
checkOIDV(1)
checkRCE(rce1_h8, 8, 0, nil, ioerr)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h8, rce1_h9, rce1_h15, rce1_h19, rce1_h21)
checkMRU(17, rce1_h8, rce1_h21, rce1_h19, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
checkLookup(xidat(1,8), rce1_h8)
checkLookup(xidat(1,7), rce1_h7)
// have data exact and inexact hits
checkLookup(xidat(1,7), rce1_h7)
checkLookup(xidat(1,6), rce1_h6)
checkLookup(xidat(1,5), rce1_h6)
checkLookup(xidat(1,4), rce1_h6)
// nodata exact and inexact hits
checkLookup(xidat(1,3), rce1_h3)
checkLookup(xidat(1,2), rce1_h3)
checkLookup(xidat(1,1), rce1_h3)
checkLookup(xidat(1,0), rce1_h3)
// ---- verify how LRU changes for in-cache loads ----
checkMRU(17, rce1_h8, rce1_h21, rce1_h19, rce1_h15, rce1_h9, rce1_h7, rce1_h6, rce1_h3)
checkLoad(xidat(1,6), b(hello), 4, nil)
checkMRU(17, rce1_h6, rce1_h8, rce1_h21, rce1_h19, rce1_h15, rce1_h9, rce1_h7, rce1_h3)
checkLoad(xidat(1,15), b(world), 10, nil)
checkMRU(17, rce1_h15, rce1_h6, rce1_h8, rce1_h21, rce1_h19, rce1_h9, rce1_h7, rce1_h3)
// ---- verify LRU eviction ----
checkOIDV(1)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h8, rce1_h9, rce1_h15, rce1_h19, rce1_h21)
checkMRU(17, rce1_h15, rce1_h6, rce1_h8, rce1_h21, rce1_h19, rce1_h9, rce1_h7, rce1_h3)
c.SetSizeMax(16) // < c.size by 1 -> should trigger gc
// evicted:
// - 3] (lru.1, nodata, size=0) XXX ok to evict nodata & friends?
// - 7] (lru.2, ioerr, size=0)
// - 9] (lru.3, ioerr, size=0)
// - 19] (lru.4, zz, size=2)
checkOIDV(1)
checkOCE(1, rce1_h6, rce1_h8, rce1_h15, rce1_h21)
checkMRU(15, rce1_h15, rce1_h6, rce1_h8, rce1_h21)
// reload 19] -> 21] should be evicted
c.Load(ctx, xidat(1,19))
// - evicted 21] (lru.1, www, size=3)
// - loaded 19] (zz, size=2)
ok1(len(oce1.rcev) == 4)
rce1_h19_2 := oce1.rcev[3]
ok1(rce1_h19_2 != rce1_h19)
checkOIDV(1)
checkRCE(rce1_h19_2, 19, 16, b(zz), nil)
checkOCE(1, rce1_h6, rce1_h8, rce1_h15, rce1_h19_2)
checkMRU(14, rce1_h19_2, rce1_h15, rce1_h6, rce1_h8)
// load big 77] -> several rce must be evicted
c.Load(ctx, xidat(1,77))
// - evicted 8] (lru.1, ioerr, size=0)
// - evicted 6] (lru.2, hello, size=5)
// - evicted 15] (lru.3, world, size=7)
// - loaded 77] (big, size=10)
ok1(len(oce1.rcev) == 2)
rce1_h77 := oce1.rcev[1]
checkOIDV(1)
checkRCE(rce1_h77, 77, 77, b(big), nil)
checkOCE(1, rce1_h19_2, rce1_h77)
checkMRU(12, rce1_h77, rce1_h19_2)
// sizeMax=0 evicts everything from cache
c.SetSizeMax(0)
checkOIDV()
checkMRU(0)
// and still loading works (because even if though rce's are evicted
// they stay live while someone user waits and uses it)
checkLoad(xidat(1,4), b(hello), 4, nil)
checkOIDV()
checkMRU(0)
// ---- Load vs concurrent GC ----
// in the following scenario if GC runs after Load completed lookupRCE
// but before Load increfs returned buf, the GC will actually return
// the buf to buffer pool and so Load will be returning wrong buffer:
//
// ---- 8< ----
// T1 Tgc
// Prefetch:
// RCELookedUp
// RCELoaded
// # GC - on hold
// Load
// RCELookedUp
// -> pause T1
// # GC - unpause
// GCStart
// GCStop
// <- unpause T1
// # load completes
// ---- 8< ----
//
// it is hard to check this via stable tracepoints because, if done so,
// after the problem is fixed the test will deadlock.
// So test it probabilistically instead.
c.SetSizeMax(0) // we want to GC to be constantly running
for i := 0; i < 1e4; i++ {
// issue Prefetch: this should create RCE and spawn loadRCE for it
c.Prefetch(ctx, xidat(1,4))
// issue Load: this should lookup the RCE and wait for it to be loaded.
// if GC runs in parallel to Load there are chances it will
// be running in between Load->lookupRCE and final rce.buf.XIncref()
//
// if something is incorrect with refcounting either
// buf.Incref() in Load or buf.Release() in GC will panic.
checkLoad(xidat(1,4), b(hello), 4, nil)
}
// XXX verify caching vs ctx cancel
// XXX verify db inconsistency checks
// XXX verify loading with before > cache.before
}
type Checker struct {
t *testing.T
}
func (c *Checker) ok1(v bool) {
c.t.Helper()
if !v {
c.t.Fatal("!ok")
}
}
func (c *Checker) assertEq(a, b interface{}) {
c.t.Helper()
if !reflect.DeepEqual(a, b) {
c.t.Fatal("!eq:\n", pretty.Compare(a, b))
}
}
// ----------------------------------------
// noopStorage is dummy StorLoader which for any oid/xid always returns 1-byte data
type noopStorage struct{}
var noopData = []byte{0}
func (s *noopStorage) URL() string {
return "noop"
}
func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
return mkbuf(noopData), 1, nil
}
// benchLoad serially benchmarks a StorLoader - either storage directly or a cache on top of it
//
// oid accessed are [0, worksize)
func benchLoad(b *testing.B, l StorLoader, worksize int) {
benchLoadN(b, b.N, l, worksize)
}
// worker for benchLoad, with n overridding b.N
func benchLoadN(b *testing.B, n int, l StorLoader, worksize int) {
ctx := context.Background()
xid := Xid{At: 1, Oid: 0}
for i := 0; i < n; i++ {
buf, _, err := l.Load(ctx, xid)
if err != nil {
b.Fatal(err)
}
buf.XRelease()
xid.Oid++
if xid.Oid >= Oid(worksize) {
xid.Oid = 0
}
}
}
// benchmark storage under cache
func BenchmarkNoopStorage(b *testing.B) { benchLoad(b, &noopStorage{}, b.N /* = ∞ */) }
// cache sizes to benchmark (elements = bytes (we are using 1-byte element))
var cachesizev = []int{0, 16, 128, 512, 4096}
// benchEachCache runs benchmark f against caches with various sizes on top of noop storage
func benchEachCache(b *testing.B, f func(b *testing.B, c *Cache)) {
s := &noopStorage{}
for _, size := range cachesizev {
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {
c := NewCache(s, size)
f(b, c)
})
}
}
// benchmark cache while N(access) < N(cache-entries)
func BenchmarkCacheStartup(b *testing.B) {
s := &noopStorage{}
c := NewCache(s, b.N)
benchLoad(b, c, b.N)
b.StopTimer()
}
// Serially benchmark cache overhead - the additional time cache adds for loading not-yet-cached entries.
// cache is already started - N(access) > N(cache-entries).
func BenchmarkCacheNoHit(b *testing.B) {
benchEachCache(b, func(b *testing.B, c *Cache) {
benchLoad(b, c, b.N /* = ∞ */)
})
}
// Serially benchmark t when load request hits cache.
// cache is already started - N(access) > N(cache-entries)
func BenchmarkCacheHit(b *testing.B) {
benchEachCache(b, func(b *testing.B, c *Cache) {
// warmup - load for cache size
benchLoadN(b, c.sizeMax, c, c.sizeMax)
b.ResetTimer()
benchLoad(b, c, c.sizeMax)
})
}
// ---- parallel benchmarks (many requests to 1 cache) ----
// benchLoadPar is like benchLoad but issues loads in parallel
func benchLoadPar(b *testing.B, l StorLoader, worksize int) {
ctx := context.Background()
np := runtime.GOMAXPROCS(0)
p := uint64(0)
b.RunParallel(func(pb *testing.PB) {
oid0 := Oid(atomic.AddUint64(&p, +1)) // all workers start/iterate at different oid
xid := Xid{At: 1, Oid: oid0}
for pb.Next() {
buf, _, err := l.Load(ctx, xid)
if err != nil {
b.Fatal(err)
}
buf.XRelease()
xid.Oid += Oid(np)
if xid.Oid >= Oid(worksize) {
xid.Oid = oid0
}
}
})
}
func BenchmarkNoopStoragePar(b *testing.B) { benchLoadPar(b, &noopStorage{}, b.N /* = ∞ */) }
func BenchmarkCacheStartupPar(b *testing.B) {
s := &noopStorage{}
c := NewCache(s, b.N)
benchLoadPar(b, c, b.N)
b.StopTimer()
}
func BenchmarkCacheNoHitPar(b *testing.B) {
benchEachCache(b, func(b *testing.B, c *Cache) {
benchLoadPar(b, c, b.N /* = ∞ */)
})
}
func BenchmarkCacheHitPar(b *testing.B) {
benchEachCache(b, func(b *testing.B, c *Cache) {
// warmup (serially) - load for cache size
benchLoadN(b, c.sizeMax, c, c.sizeMax)
b.ResetTimer()
benchLoadPar(b, c, c.sizeMax)
})
}
// ---- parallel benchmarks (many caches - each is handled serially, as if each is inside separate process) ----
// XXX gc process is still only 1 shared.
// XXX this benchmark part will probably go away
// benchLoadProc is like benchLoad but works with PB, not B
func benchLoadProc(pb *testing.PB, l StorLoader, worksize int) error {
ctx := context.Background()
xid := Xid{At: 1, Oid: 0}
for pb.Next() {
buf, _, err := l.Load(ctx, xid)
if err != nil {
return err
}
buf.XRelease()
xid.Oid++
if xid.Oid >= Oid(worksize) {
xid.Oid = 0
}
}
return nil
}
func BenchmarkNoopStorageProc(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
s := &noopStorage{}
err := benchLoadProc(pb, s, b.N)
if err != nil {
b.Fatal(err)
}
})
}
func BenchmarkCacheStartupProc(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
s := &noopStorage{}
c := NewCache(s, b.N)
err := benchLoadProc(pb, c, b.N)
if err != nil {
b.Fatal(err)
}
// XXX stop timer
})
}
func benchEachCacheProc(b *testing.B, f func(b *testing.B, pb *testing.PB, c *Cache) error) {
for _, size := range cachesizev {
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
s := &noopStorage{}
c := NewCache(s, size)
err := f(b, pb, c)
if err != nil {
b.Fatal(err)
}
})
})
}
}
func BenchmarkCacheNoHitProc(b *testing.B) {
benchEachCacheProc(b, func(b *testing.B, pb *testing.PB, c *Cache) error {
return benchLoadProc(pb, c, b.N)
})
}
func BenchmarkCacheHitProc(b *testing.B) {
benchEachCacheProc(b, func(b *testing.B, pb *testing.PB, c *Cache) error {
// XXX no warmup
return benchLoadProc(pb, c, c.sizeMax)
})
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment