Commit 0abecd63 authored by Kirill Smelkov's avatar Kirill Smelkov

X Split IStorage -> IStorage, IStorageDriver

- IStorageDriver implements only raw access to a particular storage
- Prefetching logic + other common bits are implemented byt zodb package
  infrastructure (e.g. Prefetch() and URL()) and access to them is
  provided by IStorage.

It currently regresses plain load speed:

name                           old time/object  new time/object  delta
dataset:wczblk1-8
deco/fs1/zhash.py                  15.4µs ± 4%      15.2µs ± 2%      ~     (p=0.159 n=4+5)
deco/fs1/zhash.py-P16               116µs ±17%       121µs ±21%      ~     (p=0.407 n=16+16)
deco/fs1/zhash.go                  1.50µs ± 0%      5.30µs ± 0%  +253.33%  (p=0.016 n=5+4)
deco/fs1/zhash.go+prefetch128      4.08µs ± 5%      4.20µs ± 2%      ~     (p=0.143 n=5+5)
deco/fs1/zhash.go-P16             4.77µs ±179%     33.85µs ±54%  +610.14%  (p=0.000 n=15+16)
dataset:prod1-1024
deco/fs1/zhash.py                  12.2µs ± 1%      12.4µs ± 4%      ~     (p=0.540 n=4+5)
deco/fs1/zhash.py-P16               102µs ±12%       101µs ±16%      ~     (p=0.802 n=16+16)
deco/fs1/zhash.go                  1.10µs ± 0%      2.66µs ± 2%  +141.82%  (p=0.008 n=5+5)
deco/fs1/zhash.go+prefetch128      2.70µs ± 0%      2.67µs ± 3%      ~     (p=1.000 n=4+4)
deco/fs1/zhash.go-P16             3.20µs ±138%     17.25µs ±42%  +439.06%  (p=0.000 n=16+16)

will try to investigate what is going on inside.
parent 6ae2604b
...@@ -68,19 +68,9 @@ type Client struct { ...@@ -68,19 +68,9 @@ type Client struct {
// protected by .node.StateMu // protected by .node.StateMu
operational bool // XXX <- somehow move to NodeApp? operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational opReady chan struct{} // reinitialized each time state becomes non-operational
url *url.URL // original URL we were opened with
} }
var _ zodb.IStorage = (*Client)(nil) var _ zodb.IStorageDriver = (*Client)(nil)
func (c *Client) StorageName() string {
return fmt.Sprintf("neo(%s)", c.node.ClusterName)
}
func (c *Client) URL() string {
return c.url.String()
}
// NewClient creates new client node. // NewClient creates new client node.
// //
...@@ -507,7 +497,7 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator { ...@@ -507,7 +497,7 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
} }
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorage, error) { func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorageDriver, error) {
// neo://name@master1,master2,...,masterN?options // neo://name@master1,master2,...,masterN?options
if u.User == nil { if u.User == nil {
...@@ -527,10 +517,9 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo ...@@ -527,10 +517,9 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo
// as ctx for open can be done after open finishes - not covering // as ctx for open can be done after open finishes - not covering
// whole storage working lifetime. // whole storage working lifetime.
c := NewClient(u.User.Username(), u.Host, net) c := NewClient(u.User.Username(), u.Host, net)
c.url = u // FIXME move this inside NewClient
return c, nil return c, nil
} }
func init() { func init() {
zodb.RegisterStorage("neo", openClientByURL) zodb.RegisterDriver("neo", openClientByURL)
} }
...@@ -58,7 +58,7 @@ type Storage struct { ...@@ -58,7 +58,7 @@ type Storage struct {
// 1 inbox/ (commit queues) // 1 inbox/ (commit queues)
// 2 ? (data.fs) // 2 ? (data.fs)
// 3 packed/ (deltified objects) // 3 packed/ (deltified objects)
zstor zodb.IStorage // underlying ZODB storage XXX -> directly work with fs1 & friends zstor zodb.IStorageDriver // underlying ZODB storage XXX -> directly work with fs1 & friends
//nodeCome chan nodeCome // node connected //nodeCome chan nodeCome // node connected
} }
...@@ -67,7 +67,7 @@ type Storage struct { ...@@ -67,7 +67,7 @@ type Storage struct {
// //
// The storage uses zstor as underlying backend for storing data. // The storage uses zstor as underlying backend for storing data.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage { func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorageDriver) *Storage {
stor := &Storage{ stor := &Storage{
node: neo.NewNodeApp(net, neo.STORAGE, clusterName, masterAddr, serveAddr), node: neo.NewNodeApp(net, neo.STORAGE, clusterName, masterAddr, serveAddr),
zstor: zstor, zstor: zstor,
......
...@@ -41,7 +41,6 @@ import ( ...@@ -41,7 +41,6 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
_ "lab.nexedi.com/kirr/neo/go/zodb/wks" _ "lab.nexedi.com/kirr/neo/go/zodb/wks"
"lab.nexedi.com/kirr/neo/go/zodb/storage"
"github.com/pkg/profile" "github.com/pkg/profile"
) )
...@@ -124,39 +123,19 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c ...@@ -124,39 +123,19 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
err = xerr.First(err, err2) err = xerr.First(err, err2)
}() }()
// XXX always open storage with cache by zodb.OpenStorage
var cache *storage.Cache
if useprefetch {
cache = storage.NewCache(stor, 16*1024*1024)
}
prefetch := func(ctx context.Context, xid zodb.Xid) {
if cache != nil {
//fmt.Printf("prefetch %v\n", xid)
cache.Prefetch(ctx, xid)
}
}
load := func(ctx context.Context, xid zodb.Xid) (*zodb.Buf, zodb.Tid, error) {
if cache != nil {
return cache.Load(ctx, xid)
} else {
return stor.Load(ctx, xid)
}
}
const nprefetch = 128 // XXX -> 512 ? const nprefetch = 128 // XXX -> 512 ?
// prefetchBlk prefetches block of nprefetch objects starting from xid // prefetchBlk prefetches block of nprefetch objects starting from xid
//var tprevLoadBlkStart time.Time //var tprevLoadBlkStart time.Time
prefetchBlk := func(ctx context.Context, xid zodb.Xid) { prefetchBlk := func(ctx context.Context, xid zodb.Xid) {
if cache == nil { if !useprefetch {
return return
} }
//t1 := time.Now() //t1 := time.Now()
for i := 0; i < nprefetch; i++ { for i := 0; i < nprefetch; i++ {
prefetch(ctx, xid) //fmt.Printf("prefetch %v\n", xid)
stor.Prefetch(ctx, xid)
xid.Oid++ xid.Oid++
} }
//t2 := time.Now() //t2 := time.Now()
...@@ -193,7 +172,7 @@ loop: ...@@ -193,7 +172,7 @@ loop:
if xid.Oid % nprefetch == 0 { if xid.Oid % nprefetch == 0 {
prefetchBlk(ctx, xid) prefetchBlk(ctx, xid)
} }
buf, _, err := load(ctx, xid) buf, _, err := stor.Load(ctx, xid)
switch err.(type) { switch err.(type) {
case nil: case nil:
// ok // ok
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package storage package zodb
// cache management // cache management
//go:generate gotrace gen . //go:generate gotrace gen .
...@@ -29,15 +29,13 @@ import ( ...@@ -29,15 +29,13 @@ import (
"sync" "sync"
"unsafe" "unsafe"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontainer/list" "lab.nexedi.com/kirr/neo/go/xcommon/xcontainer/list"
) )
// XXX managing LRU under 1 big gcMu might be bad for scalability. // XXX managing LRU under 1 big gcMu might be bad for scalability.
// TODO maintain nhit / nmiss + way to read cache stats // TODO maintain nhit / nmiss + way to read cache stats
// Cache adds RAM caching layer over a storage. // Cache provides RAM caching layer that can be used over a storage.
type Cache struct { type Cache struct {
loader StorLoader loader StorLoader
...@@ -45,9 +43,9 @@ type Cache struct { ...@@ -45,9 +43,9 @@ type Cache struct {
// cache is fully synchronized with storage for transactions with tid <= head. // cache is fully synchronized with storage for transactions with tid <= head.
// XXX clarify ^^^ (it means if revCacheEntry.head=∞ it is Cache.head) // XXX clarify ^^^ (it means if revCacheEntry.head=∞ it is Cache.head)
head zodb.Tid head Tid
entryMap map[zodb.Oid]*oidCacheEntry // oid -> oid's cache entries entryMap map[Oid]*oidCacheEntry // oid -> oid's cache entries
// garbage collection: // garbage collection:
gcCh chan struct{} // signals gc to run gcCh chan struct{} // signals gc to run
...@@ -90,11 +88,11 @@ type revCacheEntry struct { ...@@ -90,11 +88,11 @@ type revCacheEntry struct {
// //
// .head can be > cache.head and still finite - that represents a // .head can be > cache.head and still finite - that represents a
// case when load with tid > cache.head was called. // case when load with tid > cache.head was called.
head zodb.Tid head Tid
// loading result: object (buf, serial) or error // loading result: object (buf, serial) or error
buf *zodb.Buf buf *Buf
serial zodb.Tid serial Tid
err error err error
ready chan struct{} // closed when loading finished ready chan struct{} // closed when loading finished
...@@ -104,7 +102,7 @@ type revCacheEntry struct { ...@@ -104,7 +102,7 @@ type revCacheEntry struct {
// StorLoader represents loading part of a storage. // StorLoader represents loading part of a storage.
// XXX -> zodb.IStorageLoader (or zodb.Loader ?) ? // XXX -> zodb.IStorageLoader (or zodb.Loader ?) ?
type StorLoader interface { type StorLoader interface {
Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error)
} }
// lock order: Cache.mu > oidCacheEntry // lock order: Cache.mu > oidCacheEntry
...@@ -117,7 +115,7 @@ type StorLoader interface { ...@@ -117,7 +115,7 @@ type StorLoader interface {
func NewCache(loader StorLoader, sizeMax int) *Cache { func NewCache(loader StorLoader, sizeMax int) *Cache {
c := &Cache{ c := &Cache{
loader: loader, loader: loader,
entryMap: make(map[zodb.Oid]*oidCacheEntry), entryMap: make(map[Oid]*oidCacheEntry),
gcCh: make(chan struct{}, 1), // 1 is important - see gcsignal gcCh: make(chan struct{}, 1), // 1 is important - see gcsignal
sizeMax: sizeMax, sizeMax: sizeMax,
} }
...@@ -145,7 +143,7 @@ func (c *Cache) SetSizeMax(sizeMax int) { ...@@ -145,7 +143,7 @@ func (c *Cache) SetSizeMax(sizeMax int) {
// Load loads data from database via cache. // Load loads data from database via cache.
// //
// If data is already in cache - cached content is returned. // If data is already in cache - cached content is returned.
func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) { func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid)
// rce is already in cache - use it // rce is already in cache - use it
...@@ -175,7 +173,7 @@ func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial z ...@@ -175,7 +173,7 @@ func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial z
// If data is not yet in cache loading for it is started in the background. // If data is not yet in cache loading for it is started in the background.
// Prefetch is not blocking operation and does not wait for loading, if any was // Prefetch is not blocking operation and does not wait for loading, if any was
// started, to complete. // started, to complete.
func (c *Cache) Prefetch(ctx context.Context, xid zodb.Xid) { func (c *Cache) Prefetch(ctx context.Context, xid Xid) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid)
// !rceNew -> no need to adjust LRU - it will be adjusted by further actual data Load // !rceNew -> no need to adjust LRU - it will be adjusted by further actual data Load
...@@ -194,7 +192,7 @@ func (c *Cache) Prefetch(ctx context.Context, xid zodb.Xid) { ...@@ -194,7 +192,7 @@ func (c *Cache) Prefetch(ctx context.Context, xid zodb.Xid) {
// //
// rceNew indicates whether rce is new and so loading on it has not been // rceNew indicates whether rce is new and so loading on it has not been
// initiated yet. If so the caller should proceed to loading rce via loadRCE. // initiated yet. If so the caller should proceed to loading rce via loadRCE.
func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { func (c *Cache) lookupRCE(xid Xid) (rce *revCacheEntry, rceNew bool) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.syncedTo read consistently // exit with oce locked and cache.syncedTo read consistently
c.mu.RLock() c.mu.RLock()
...@@ -223,7 +221,7 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -223,7 +221,7 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
l := len(oce.rcev) l := len(oce.rcev)
i := sort.Search(l, func(i int) bool { i := sort.Search(l, func(i int) bool {
head_i := oce.rcev[i].head head_i := oce.rcev[i].head
if head_i == zodb.TidMax { if head_i == TidMax {
head_i = cacheHead head_i = cacheHead
} }
return xid.At <= head_i return xid.At <= head_i
...@@ -236,7 +234,7 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -236,7 +234,7 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
if rce.head == cacheHead { if rce.head == cacheHead {
// FIXME better do this when the entry becomes loaded ? // FIXME better do this when the entry becomes loaded ?
// XXX vs concurrent invalidations? // XXX vs concurrent invalidations?
rce.head = zodb.TidMax rce.head = TidMax
} }
rceNew = true rceNew = true
...@@ -270,9 +268,9 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -270,9 +268,9 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// //
// rce must be new just created by lookupRCE() with returned rceNew=true. // rce must be new just created by lookupRCE() with returned rceNew=true.
// loading completion is signalled by closing rce.ready. // loading completion is signalled by closing rce.ready.
func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) { func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) {
oce := rce.parent oce := rce.parent
buf, serial, err := c.loader.Load(ctx, zodb.Xid{At: rce.head, Oid: oid}) buf, serial, err := c.loader.Load(ctx, Xid{At: rce.head, Oid: oid})
// normalize buf/serial if it was error // normalize buf/serial if it was error
if err != nil { if err != nil {
...@@ -380,7 +378,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) { ...@@ -380,7 +378,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
// must be called with .parent locked // must be called with .parent locked
// //
// XXX move oid from args to revCacheEntry? // XXX move oid from args to revCacheEntry?
func tryMerge(prev, next, cur *revCacheEntry, oid zodb.Oid) bool { func tryMerge(prev, next, cur *revCacheEntry, oid Oid) bool {
// can merge if consistent if // can merge if consistent if
// (if merging) // (if merging)
...@@ -505,15 +503,15 @@ func isErrNoData(err error) bool { ...@@ -505,15 +503,15 @@ func isErrNoData(err error) bool {
default: default:
return false return false
case *zodb.ErrOidMissing: case *ErrOidMissing:
case *zodb.ErrXidMissing: case *ErrXidMissing:
} }
return true return true
} }
// newRevEntry creates new revCacheEntry with .head and inserts it into .rcev @i. // newRevEntry creates new revCacheEntry with .head and inserts it into .rcev @i.
// (if i == len(oce.rcev) - entry is appended) // (if i == len(oce.rcev) - entry is appended)
func (oce *oidCacheEntry) newRevEntry(i int, head zodb.Tid) *revCacheEntry { func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry {
rce := &revCacheEntry{ rce := &revCacheEntry{
parent: oce, parent: oce,
serial: 0, serial: 0,
...@@ -576,11 +574,11 @@ func (rce *revCacheEntry) loaded() bool { ...@@ -576,11 +574,11 @@ func (rce *revCacheEntry) loaded() bool {
// //
// ( ErrXidMissing contains xid for which it is missing. In cache we keep such // ( ErrXidMissing contains xid for which it is missing. In cache we keep such
// xid with max .head but users need to get ErrXidMissing with their own query ) // xid with max .head but users need to get ErrXidMissing with their own query )
func (rce *revCacheEntry) userErr(xid zodb.Xid) error { func (rce *revCacheEntry) userErr(xid Xid) error {
switch e := rce.err.(type) { switch e := rce.err.(type) {
case *zodb.ErrXidMissing: case *ErrXidMissing:
if e.Xid != xid { if e.Xid != xid {
return &zodb.ErrXidMissing{xid} return &ErrXidMissing{xid}
} }
} }
...@@ -603,14 +601,14 @@ func (h *lruHead) rceFromInLRU() (rce *revCacheEntry) { ...@@ -603,14 +601,14 @@ func (h *lruHead) rceFromInLRU() (rce *revCacheEntry) {
} }
// errDB returns error about database being inconsistent // errDB returns error about database being inconsistent
func errDB(oid zodb.Oid, format string, argv ...interface{}) error { func errDB(oid Oid, format string, argv ...interface{}) error {
// XXX -> separate type? // XXX -> separate type?
return fmt.Errorf("cache: database inconsistency: oid: %v: " + format, return fmt.Errorf("cache: database inconsistency: oid: %v: " + format,
append([]interface{}{oid}, argv...)...) append([]interface{}{oid}, argv...)...)
} }
// errDB marks rce with database inconsistency error // errDB marks rce with database inconsistency error
func (rce *revCacheEntry) errDB(oid zodb.Oid, format string, argv ...interface{}) { func (rce *revCacheEntry) errDB(oid Oid, format string, argv ...interface{}) {
rce.err = errDB(oid, format, argv...) rce.err = errDB(oid, format, argv...)
rce.buf.XRelease() rce.buf.XRelease()
rce.buf = nil rce.buf = nil
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package storage package zodb
import ( import (
"bytes" "bytes"
...@@ -29,7 +29,6 @@ import ( ...@@ -29,7 +29,6 @@ import (
"testing" "testing"
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/tracing" "lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/neo/go/xcommon/xtesting" "lab.nexedi.com/kirr/neo/go/xcommon/xtesting"
...@@ -38,19 +37,19 @@ import ( ...@@ -38,19 +37,19 @@ import (
// tStorage implements read-only storage for cache testing // tStorage implements read-only storage for cache testing
type tStorage struct { type tStorage struct {
// oid -> [](.serial↑, .data) // oid -> [](.serial↑, .data)
dataMap map[zodb.Oid][]tOidData dataMap map[Oid][]tOidData
} }
// data for oid for 1 revision // data for oid for 1 revision
type tOidData struct { type tOidData struct {
serial zodb.Tid serial Tid
data []byte data []byte
err error // e.g. io error err error // e.g. io error
} }
// create new buffer with specified content copied there. // create new buffer with specified content copied there.
func mkbuf(data []byte) *zodb.Buf { func mkbuf(data []byte) *Buf {
buf := zodb.BufAlloc(len(data)) buf := BufAlloc(len(data))
copy(buf.Data, data) copy(buf.Data, data)
return buf return buf
} }
...@@ -58,7 +57,7 @@ func mkbuf(data []byte) *zodb.Buf { ...@@ -58,7 +57,7 @@ func mkbuf(data []byte) *zodb.Buf {
// check whether buffers hold same data or both are nil. // check whether buffers hold same data or both are nil.
// //
// NOTE we ignore refcnt here // NOTE we ignore refcnt here
func bufSame(buf1, buf2 *zodb.Buf) bool { func bufSame(buf1, buf2 *Buf) bool {
if buf1 == nil { if buf1 == nil {
return (buf2 == nil) return (buf2 == nil)
} }
...@@ -66,13 +65,13 @@ func bufSame(buf1, buf2 *zodb.Buf) bool { ...@@ -66,13 +65,13 @@ func bufSame(buf1, buf2 *zodb.Buf) bool {
return reflect.DeepEqual(buf1.Data, buf2.Data) return reflect.DeepEqual(buf1.Data, buf2.Data)
} }
func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) { func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *Buf, serial Tid, err error) {
//fmt.Printf("> load(%v)\n", xid) //fmt.Printf("> load(%v)\n", xid)
//defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }() //defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }()
datav := stor.dataMap[xid.Oid] datav := stor.dataMap[xid.Oid]
if datav == nil { if datav == nil {
return nil, 0, &zodb.ErrOidMissing{xid.Oid} return nil, 0, &ErrOidMissing{xid.Oid}
} }
// find max entry with .serial <= xid.At // find max entry with .serial <= xid.At
...@@ -85,7 +84,7 @@ func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, seri ...@@ -85,7 +84,7 @@ func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, seri
//fmt.Printf("i: %d n: %d\n", i, n) //fmt.Printf("i: %d n: %d\n", i, n)
if i == -1 { if i == -1 {
// xid.At < all .serial - no such transaction // xid.At < all .serial - no such transaction
return nil, 0, &zodb.ErrXidMissing{xid} return nil, 0, &ErrXidMissing{xid}
} }
s, e := datav[i].serial, datav[i].err s, e := datav[i].serial, datav[i].err
...@@ -98,8 +97,8 @@ func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, seri ...@@ -98,8 +97,8 @@ func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, seri
var ioerr = errors.New("input/output error") var ioerr = errors.New("input/output error")
func xidat(oid zodb.Oid, tid zodb.Tid) zodb.Xid { func xidat(oid Oid, tid Tid) Xid {
return zodb.Xid{Oid: oid, At: tid} return Xid{Oid: oid, At: tid}
} }
// tracer which collects tracing events from all needed-for-tests sources // tracer which collects tracing events from all needed-for-tests sources
...@@ -133,7 +132,7 @@ func TestCache(t *testing.T) { ...@@ -133,7 +132,7 @@ func TestCache(t *testing.T) {
big := []byte("0123456789") big := []byte("0123456789")
tstor := &tStorage{ tstor := &tStorage{
dataMap: map[zodb.Oid][]tOidData{ dataMap: map[Oid][]tOidData{
1: { 1: {
{4, hello, nil}, {4, hello, nil},
{7, nil, ioerr}, {7, nil, ioerr},
...@@ -150,7 +149,7 @@ func TestCache(t *testing.T) { ...@@ -150,7 +149,7 @@ func TestCache(t *testing.T) {
c := NewCache(tstor, 100 /* > Σ all data */) c := NewCache(tstor, 100 /* > Σ all data */)
ctx := context.Background() ctx := context.Background()
checkLoad := func(xid zodb.Xid, buf *zodb.Buf, serial zodb.Tid, err error) { checkLoad := func(xid Xid, buf *Buf, serial Tid, err error) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
b, s, e := c.Load(ctx, xid) b, s, e := c.Load(ctx, xid)
...@@ -169,7 +168,7 @@ func TestCache(t *testing.T) { ...@@ -169,7 +168,7 @@ func TestCache(t *testing.T) {
} }
} }
checkRCE := func(rce *revCacheEntry, head, serial zodb.Tid, buf *zodb.Buf, err error) { checkRCE := func(rce *revCacheEntry, head, serial Tid, buf *Buf, err error) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
if rce.head != head { if rce.head != head {
...@@ -190,7 +189,7 @@ func TestCache(t *testing.T) { ...@@ -190,7 +189,7 @@ func TestCache(t *testing.T) {
} }
} }
checkOCE := func(oid zodb.Oid, rcev ...*revCacheEntry) { checkOCE := func(oid Oid, rcev ...*revCacheEntry) {
t.Helper() t.Helper()
oce := c.entryMap[oid] oce := c.entryMap[oid]
oceRcev := oce.rcev oceRcev := oce.rcev
...@@ -235,26 +234,26 @@ func TestCache(t *testing.T) { ...@@ -235,26 +234,26 @@ func TestCache(t *testing.T) {
checkMRU(0) checkMRU(0)
// load @2 -> new rce entry // load @2 -> new rce entry
checkLoad(xidat(1,2), nil, 0, &zodb.ErrXidMissing{xidat(1,2)}) checkLoad(xidat(1,2), nil, 0, &ErrXidMissing{xidat(1,2)})
oce1 := c.entryMap[1] oce1 := c.entryMap[1]
ok1(len(oce1.rcev) == 1) ok1(len(oce1.rcev) == 1)
rce1_h2 := oce1.rcev[0] rce1_h2 := oce1.rcev[0]
checkRCE(rce1_h2, 2, 0, nil, &zodb.ErrXidMissing{xidat(1,2)}) checkRCE(rce1_h2, 2, 0, nil, &ErrXidMissing{xidat(1,2)})
checkMRU(0, rce1_h2) checkMRU(0, rce1_h2)
// load @3 -> 2] merged with 3] // load @3 -> 2] merged with 3]
checkLoad(xidat(1,3), nil, 0, &zodb.ErrXidMissing{xidat(1,3)}) checkLoad(xidat(1,3), nil, 0, &ErrXidMissing{xidat(1,3)})
ok1(len(oce1.rcev) == 1) ok1(len(oce1.rcev) == 1)
rce1_h3 := oce1.rcev[0] rce1_h3 := oce1.rcev[0]
ok1(rce1_h3 != rce1_h2) // rce1_h2 was merged into rce1_h3 ok1(rce1_h3 != rce1_h2) // rce1_h2 was merged into rce1_h3
checkRCE(rce1_h3, 3, 0, nil, &zodb.ErrXidMissing{xidat(1,3)}) checkRCE(rce1_h3, 3, 0, nil, &ErrXidMissing{xidat(1,3)})
checkMRU(0, rce1_h3) checkMRU(0, rce1_h3)
// load @1 -> 1] merged with 3] // load @1 -> 1] merged with 3]
checkLoad(xidat(1,1), nil, 0, &zodb.ErrXidMissing{xidat(1,1)}) checkLoad(xidat(1,1), nil, 0, &ErrXidMissing{xidat(1,1)})
ok1(len(oce1.rcev) == 1) ok1(len(oce1.rcev) == 1)
ok1(oce1.rcev[0] == rce1_h3) ok1(oce1.rcev[0] == rce1_h3)
checkRCE(rce1_h3, 3, 0, nil, &zodb.ErrXidMissing{xidat(1,3)}) checkRCE(rce1_h3, 3, 0, nil, &ErrXidMissing{xidat(1,3)})
checkMRU(0, rce1_h3) checkMRU(0, rce1_h3)
// load @5 -> new rce entry with data // load @5 -> new rce entry with data
...@@ -407,7 +406,7 @@ func TestCache(t *testing.T) { ...@@ -407,7 +406,7 @@ func TestCache(t *testing.T) {
// ---- verify rce lookup for must be cached entries ---- // ---- verify rce lookup for must be cached entries ----
// (this excersizes lookupRCE) // (this excersizes lookupRCE)
checkLookup := func(xid zodb.Xid, expect *revCacheEntry) { checkLookup := func(xid Xid, expect *revCacheEntry) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid)
......
...@@ -32,19 +32,19 @@ type OpenOptions struct { ...@@ -32,19 +32,19 @@ type OpenOptions struct {
ReadOnly bool // whether to open storage as read-only ReadOnly bool // whether to open storage as read-only
} }
// StorageOpener is a function to open a storage // DriverOpener is a function to open a storage driver
type StorageOpener func (ctx context.Context, u *url.URL, opt *OpenOptions) (IStorage, error) type DriverOpener func (ctx context.Context, u *url.URL, opt *OpenOptions) (IStorageDriver, error)
// {} scheme -> StorageOpener // {} scheme -> DriverOpener
var storageRegistry = map[string]StorageOpener{} var driverRegistry = map[string]DriverOpener{}
// RegisterStorage registers opener to be used for URLs with scheme // RegisterDriver registers opener to be used for URLs with scheme
func RegisterStorage(scheme string, opener StorageOpener) { func RegisterDriver(scheme string, opener DriverOpener) {
if _, already := storageRegistry[scheme]; already { if _, already := driverRegistry[scheme]; already {
panic(fmt.Errorf("ZODB URL scheme %q was already registered", scheme)) panic(fmt.Errorf("ZODB URL scheme %q was already registered", scheme))
} }
storageRegistry[scheme] = opener driverRegistry[scheme] = opener
} }
// OpenStorage opens ZODB storage by URL. // OpenStorage opens ZODB storage by URL.
...@@ -54,8 +54,6 @@ func RegisterStorage(scheme string, opener StorageOpener) { ...@@ -54,8 +54,6 @@ func RegisterStorage(scheme string, opener StorageOpener) {
// get support for well-known storages. // get support for well-known storages.
// //
// Storage authors should register their storages with RegisterStorage. // Storage authors should register their storages with RegisterStorage.
//
// TODO automatically wrap opened storage with Cache.
func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (IStorage, error) { func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (IStorage, error) {
// no scheme -> file:// // no scheme -> file://
if !strings.Contains(storageURL, "://") { if !strings.Contains(storageURL, "://") {
...@@ -70,10 +68,48 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -70,10 +68,48 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
// XXX commonly handle some options from url -> opt? // XXX commonly handle some options from url -> opt?
// (e.g. ?readonly=1 -> opt.ReadOnly=true + remove ?readonly=1 from URL) // (e.g. ?readonly=1 -> opt.ReadOnly=true + remove ?readonly=1 from URL)
opener, ok := storageRegistry[u.Scheme] opener, ok := driverRegistry[u.Scheme]
if !ok { if !ok {
return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme) return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme)
} }
return opener(ctx, u, opt) storDriver, err := opener(ctx, u, opt)
if err != nil {
return nil, err
}
return &storage{
IStorageDriver: storDriver,
// small cache so that prefetch can work for loading
l1cache: NewCache(storDriver, 16 << 20), // XXX 16MB hardcoded
}, nil
}
// storage represents storage opened via OpenStorage.
//
// it provides a small cache on top of raw storage driver to implement prefetch
// and other storage-independed higher-level functionality.
type storage struct {
IStorageDriver
l1cache *Cache
url string // URL this storage was opened via
}
// loading always goes through cache - this way prefetching can work
func (s *storage) Load(ctx context.Context, xid Xid) (*Buf, Tid, error) {
return s.l1cache.Load(ctx, xid)
}
func (s *storage) Prefetch(ctx context.Context, xid Xid) {
s.l1cache.Prefetch(ctx, xid)
}
func (s *storage) URL() string {
return s.url
} }
...@@ -69,7 +69,6 @@ import ( ...@@ -69,7 +69,6 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"net/url"
"os" "os"
"sync" "sync"
...@@ -91,21 +90,10 @@ type FileStorage struct { ...@@ -91,21 +90,10 @@ type FileStorage struct {
// (both with .Len=0 & .Tid=0 if database is empty) // (both with .Len=0 & .Tid=0 if database is empty)
txnhMin TxnHeader txnhMin TxnHeader
txnhMax TxnHeader txnhMax TxnHeader
url *url.URL // original URL we were opened with
}
// IStorage
var _ zodb.IStorage = (*FileStorage)(nil)
func (fs *FileStorage) StorageName() string {
return "FileStorage v1"
}
func (fs *FileStorage) URL() string {
return fs.url.String()
} }
// IStorageDriver
var _ zodb.IStorageDriver = (*FileStorage)(nil)
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) { func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX must be under lock // XXX must be under lock
......
...@@ -28,7 +28,7 @@ import ( ...@@ -28,7 +28,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorage, error) { func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorageDriver, error) {
// TODO handle query // TODO handle query
// XXX u.Path is not always raw path - recheck and fix // XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path path := u.Host + u.Path
...@@ -40,12 +40,9 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.ISt ...@@ -40,12 +40,9 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.ISt
} }
fs, err := Open(ctx, path) fs, err := Open(ctx, path)
if fs != nil {
fs.url = u // FIXME move this inside Open
}
return fs, err return fs, err
} }
func init() { func init() {
zodb.RegisterStorage("file", openByURL) zodb.RegisterDriver("file", openByURL)
} }
...@@ -139,11 +139,23 @@ func (e *ErrXidMissing) Error() string { ...@@ -139,11 +139,23 @@ func (e *ErrXidMissing) Error() string {
return fmt.Sprintf("%v: no matching data record found", e.Xid) return fmt.Sprintf("%v: no matching data record found", e.Xid)
} }
// IStorage is the interface provided when a ZODB storage is opened // IStorage is the interface provided by opened ZODB storage
type IStorage interface { type IStorage interface {
// URL returns URL of this storage IStorageDriver
// URL returns URL of how the storage was opened
URL() string URL() string
// Prefetch prefetches object addressed by xid.
//
// If data is not yet in cache loading for it is started in the background.
// Prefetch is not blocking operation and does not wait for loading, if any was
// started, to complete.
Prefetch(ctx context.Context, xid Xid)
}
// IStorageDriver is the raw interface provided by ZODB storage drivers
type IStorageDriver interface {
// Close closes storage // Close closes storage
Close() error Close() error
...@@ -182,19 +194,17 @@ type IStorage interface { ...@@ -182,19 +194,17 @@ type IStorage interface {
// XXX zodb.loadBefore() returns (data, serial, serial_next) -> add serial_next? // XXX zodb.loadBefore() returns (data, serial, serial_next) -> add serial_next?
Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error)
// Prefetch(ctx, xid Xid) (no error)
// TODO: write mode // TODO: write mode
// Store(oid Oid, serial Tid, data []byte, txn ITransaction) error // Store(oid Oid, serial Tid, data []byte, txn ITransaction) error
// KeepCurrent(oid Oid, serial Tid, txn ITransaction) // StoreKeepCurrent(oid Oid, serial Tid, txn ITransaction)
// TpcBegin(txn) // TpcBegin(txn)
// TpcVote(txn) // TpcVote(txn)
// TpcFinish(txn, callback) // TpcFinish(txn, callback)
// TpcAbort(txn) // TpcAbort(txn)
// TODO: invalidation channel (notify about changes made to DB not by us) // TODO: invalidation channel (notify about changes made to DB not by us from outside)
// TODO: History(ctx, oid, size=1) // TODO: History(ctx, oid, size=1)
......
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT. // Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package storage package zodb
// code generated for tracepoints // code generated for tracepoints
import ( import (
...@@ -63,4 +63,4 @@ func traceCacheGCStart_Attach(pg *tracing.ProbeGroup, probe func(c *Cache)) *tra ...@@ -63,4 +63,4 @@ func traceCacheGCStart_Attach(pg *tracing.ProbeGroup, probe func(c *Cache)) *tra
} }
// trace export signature // trace export signature
func _trace_exporthash_46a80e8af5056736069c296a95ad4c94388ab850() {} func _trace_exporthash_6392b85af001a55df3eb1603d80173377a81fc9a() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment