Commit 87b1102c authored by Kirill Smelkov's avatar Kirill Smelkov

X zodb: Move Buf to lab.nexedi.com/kirr/go123/mem

parent 75a71514
......@@ -32,6 +32,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
......@@ -384,7 +385,7 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
panic("TODO")
}
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) {
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// defer func() ...
buf, serial, err = c._Load(ctx, xid)
......@@ -413,7 +414,7 @@ func init() {
}
}
func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*zodb.Buf, zodb.Tid, error) {
func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
err := c.withOperational(ctx)
if err != nil {
return nil, 0, err
......@@ -474,7 +475,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*zodb.Buf, zodb.Tid,
if resp.Compression {
// XXX cleanup mess vvv
buf2 := zodb.BufAlloc(len(buf.Data))
buf2 := mem.BufAlloc(len(buf.Data))
buf2.Data = buf2.Data[:0]
udata, err := decompress(buf.Data, buf2.Data)
buf.Release()
......
......@@ -51,6 +51,7 @@ package neo
import (
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem"
"encoding/binary"
"errors"
......@@ -646,7 +647,7 @@ type AnswerObject struct {
NextSerial zodb.Tid // XXX but there it is out of sync
Compression bool
Checksum Checksum
Data *zodb.Buf // TODO encode -> separately (for writev)
Data *mem.Buf // TODO encode -> separately (for writev)
DataSerial zodb.Tid
}
......
......@@ -117,7 +117,7 @@ func typeName(typ types.Type) string {
}
var neo_customCodec *types.Interface // type of neo.customCodec
var zodbBuf types.Type // type of zodb.Buf
var memBuf types.Type // type of mem.Buf
// bytes.Buffer + bell & whistles
type Buffer struct {
......@@ -145,6 +145,10 @@ func (li *localImporter) Import(path string) (*types.Package, error) {
return li.Importer.Import(path)
}
// importer instance - only 1 so that for 2 top-level packages same dependent
// packages are not reimported several times.
var localImporterObj = &localImporter{importer.Default()}
func loadPkg(pkgPath string, sources ...string) *types.Package {
var filev []*ast.File
......@@ -162,7 +166,7 @@ func loadPkg(pkgPath string, sources ...string) *types.Package {
//return
// typecheck
conf := types.Config{Importer: &localImporter{importer.Default()}}
conf := types.Config{Importer: localImporterObj}
pkg, err := conf.Check(pkgPath, fset, filev, typeInfo)
if err != nil {
log.Fatalf("typecheck: %v", err)
......@@ -243,7 +247,7 @@ func main() {
log.SetFlags(0)
// go through proto.go and AST'ify & typecheck it
zodbPkg = loadPkg("lab.nexedi.com/kirr/neo/go/zodb", "../zodb/zodb.go", "../zodb/buffer.go")
zodbPkg = loadPkg("lab.nexedi.com/kirr/neo/go/zodb", "../zodb/zodb.go")
neoPkg = loadPkg("lab.nexedi.com/kirr/neo/go/neo", "proto.go", "packed.go")
// extract neo.customCodec
......@@ -257,12 +261,24 @@ func main() {
log.Fatal("customCodec is not interface (got %v)", cc.Type())
}
// extract zodb.Buf
__ := zodbPkg.Scope().Lookup("Buf")
// extract mem.Buf
memPath := "lab.nexedi.com/kirr/go123/mem"
var memPkg *types.Package
for _, pkg := range zodbPkg.Imports() {
if pkg.Path() == memPath {
memPkg = pkg
break
}
}
if memPkg == nil {
log.Fatalf("cannot find `%s` in zodb imports", memPath)
}
__ := memPkg.Scope().Lookup("Buf")
if __ == nil {
log.Fatal("cannot find `zodb.Buf`")
log.Fatal("cannot find `mem.Buf`")
}
zodbBuf = __.Type()
memBuf = __.Type()
// prologue
f := fileMap["proto.go"]
......@@ -277,6 +293,7 @@ import (
"reflect"
"sort"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/zodb"
)`)
......@@ -462,7 +479,7 @@ type CodeGenerator interface {
genArray1(path string, typ *types.Array)
genSlice1(path string, typ types.Type)
// zodb.Buf
// mem.Buf
genBuf(path string)
// generate code for a custom type which implements its own
......@@ -923,8 +940,8 @@ func (d *decoder) genSlice1(assignto string, typ types.Type) {
d.emit("}")
}
// emit code to size/encode/decode zodb.Buf
// same as slice1 but buffer is allocated via zodb.BufAlloc
// emit code to size/encode/decode mem.Buf
// same as slice1 but buffer is allocated via mem.BufAlloc
func (s *sizer) genBuf(path string) {
s.genSlice1(path + ".XData()", nil /* typ unused */)
}
......@@ -943,7 +960,7 @@ func (d *decoder) genBuf(path string) {
d.overflow.AddExpr("l")
// TODO eventually do not copy but reference original
d.emit("%v= zodb.BufAlloc(int(l))", path)
d.emit("%v= mem.BufAlloc(int(l))", path)
d.emit("copy(%v.Data, data[:l])", path)
d.emit("data = data[l:]")
......@@ -1164,8 +1181,8 @@ func codegenType(path string, typ types.Type, obj types.Object, codegen CodeGene
return
}
// zodb.Buf
if tptr, ok := typ.Underlying().(*types.Pointer); ok && tptr.Elem() == zodbBuf {
// mem.Buf
if tptr, ok := typ.Underlying().(*types.Pointer); ok && tptr.Elem() == memBuf {
codegen.genBuf(path)
return
}
......
......@@ -9,6 +9,7 @@ import (
"reflect"
"sort"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -1966,7 +1967,7 @@ func (p *AnswerObject) neoMsgDecode(data []byte) (int, error) {
goto overflow
}
nread += 8 + l
p.Data = zodb.BufAlloc(int(l))
p.Data = mem.BufAlloc(int(l))
copy(p.Data.Data, data[:l])
data = data[l:]
}
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
// data buffers management
import (
"sync"
"sync/atomic"
"lab.nexedi.com/kirr/go123/xmath"
)
// Buf is reference-counted memory buffer.
//
// To lower pressure on Go garbage-collector allocate buffers with BufAlloc and
// free them with Buf.Release.
//
// Custom allocation functions affect only performance, not correctness -
// everything should work if data buffer is allocated and/or free'ed
// via regular Go/GC-way.
type Buf struct {
Data []byte
// reference counter.
//
// NOTE to allow both Bufs created via BufAlloc and via std new, Buf is
// created with refcnt=0. The real number of references to Buf is thus .refcnt+1
refcnt int32
}
const order0 = 4 // buf sizes start from 2^4 (=16)
var bufPoolv = [19]sync.Pool{} // buf size stop at 2^(4+19-1) (=4M)
func init() {
for i := 0; i < len(bufPoolv); i++ {
i := i
bufPoolv[i].New = func() interface{} {
// NOTE *Buf, not just buf, to avoid allocation when
// making interface{} from it (interface{} wants to always point to heap)
return &Buf{Data: make([]byte, 1<<(order0+uint(i)))}
}
}
}
// BufAlloc allocates buffer of requested size from freelist.
//
// buffer memory is not initialized.
func BufAlloc(size int) *Buf {
return BufAlloc64(int64(size))
}
// BufAlloc64 is same as BufAlloc but accepts int64 for size.
func BufAlloc64(size int64) *Buf {
if size < 0 {
panic("invalid size")
}
// order = min i: 2^i >= size
order := xmath.CeilLog2(uint64(size))
order -= order0
if order < 0 {
order = 0
}
// if too big - allocate straightly from heap
if order >= len(bufPoolv) {
return &Buf{Data: make([]byte, size)}
}
buf := bufPoolv[order].Get().(*Buf)
buf.Data = buf.Data[:size] // leaving cap as is = 2^i
buf.refcnt = 0
return buf
}
// Release marks buf as no longer used by caller.
//
// It decrements buf reference-counter and if it reaches zero returns buf to
// freelist.
//
// The caller must not use buf after call to Release.
//
// XXX naming? -> Free? -> Decref? -> Unref?
func (buf *Buf) Release() {
rc := atomic.AddInt32(&buf.refcnt, -1)
if rc < 0 - 1 {
panic("Buf.Release: refcnt < 0")
}
if rc > 0 - 1 {
return
}
// order = max i: 2^i <= cap
order := xmath.FloorLog2(uint64(cap(buf.Data)))
order -= order0
if order < 0 {
return // too small
}
if order >= len(bufPoolv) {
return // too big
}
bufPoolv[order].Put(buf)
}
// Incref increments buf's reference counter by 1.
//
// buf must already have reference-counter > 0 before Incref call.
func (buf *Buf) Incref() {
rc := atomic.AddInt32(&buf.refcnt, +1)
if rc <= 1 - 1 {
panic("Buf.Incref: refcnt was < 1")
}
}
// XRelease releases buf it is != nil.
func (buf *Buf) XRelease() {
if buf != nil {
buf.Release()
}
}
// XIncref increments buf's reference counter by 1 if buf != nil.
func (buf *Buf) XIncref() {
if buf != nil {
buf.Incref()
}
}
// Len returns buf's len.
//
// it works even if buf=nil similarly to len() on nil []byte slice.
func (buf *Buf) Len() int {
if buf != nil {
return len(buf.Data)
}
return 0
}
// Cap returns buf's cap.
//
// it works even if buf=nil similarly to len() on nil []byte slice.
func (buf *Buf) Cap() int {
if buf != nil {
return cap(buf.Data)
}
return 0
}
// XData return's buf.Data or nil if buf == nil.
func (buf *Buf) XData() []byte {
if buf != nil {
return buf.Data
}
return nil
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// As of go19 sync.Pool under race-detector randomly drops items on the floor
// https://github.com/golang/go/blob/ca360c39/src/sync/pool.go#L92
// so it is not possible to verify we will get what we've just put there.
// +build !race
package zodb
import (
"reflect"
"testing"
"unsafe"
)
//go:linkname runtime_procPin runtime.procPin
//go:linkname runtime_procUnpin runtime.procUnpin
func runtime_procPin() int
func runtime_procUnpin()
func sliceDataPtr(b []byte) unsafe.Pointer {
return unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)
}
func TestBufAllocFree(t *testing.T) {
// sync.Pool uses per-P free-lists. We check that after release we will
// allocate released object. This works only if P is not changed underneath.
runtime_procPin()
defer runtime_procUnpin()
for i := uint(0); i < 25; i++ {
size := 1<<i - 1
xcap := 1<<i
buf := BufAlloc(size)
if i < order0 {
xcap = 1 << order0
}
if int(i) >= order0+len(bufPoolv) {
xcap = size
}
if len(buf.Data) != size {
t.Fatalf("%v: len=%v ; want %v", i, len(buf.Data), size)
}
if cap(buf.Data) != xcap {
t.Fatalf("%v: cap=%v ; want %v", i, cap(buf.Data), xcap)
}
checkref := func(rc int32) {
t.Helper()
if buf.refcnt != rc {
t.Fatalf("%v: refcnt=%v ; want %v", i, buf.refcnt, rc)
}
}
checkref(0)
// free and allocate another buf -> it must be it
data := buf.Data
buf.Release()
checkref(-1)
buf2 := BufAlloc(size)
// not from pool - memory won't be reused
if int(i) >= order0+len(bufPoolv) {
if buf2 == buf || sliceDataPtr(buf2.Data) == sliceDataPtr(data) {
t.Fatalf("%v: buffer reused but should not", i)
}
continue
}
// from pool -> it must be the same
if !(buf2 == buf && sliceDataPtr(buf2.Data) == sliceDataPtr(data)) {
t.Fatalf("%v: buffer not reused on free/realloc", i)
}
checkref(0)
// add more ref and release original buf - it must stay alive
buf.Incref()
checkref(1)
buf.Release()
checkref(0)
// another alloc must be different
buf2 = BufAlloc(size)
checkref(0)
if buf2 == buf || sliceDataPtr(buf2.Data) == sliceDataPtr(data) {
t.Fatalf("%v: buffer reused but should not", i)
}
// release buf again -> should go to pool
buf.Release()
checkref(-1)
buf2 = BufAlloc(size)
if !(buf2 == buf && sliceDataPtr(buf2.Data) == sliceDataPtr(data)) {
t.Fatalf("%v: buffer not reused on free/realloc", i)
}
checkref(0)
}
}
// empty .s so `go build` does not use -complete for go:linkname to work
......@@ -29,6 +29,8 @@ import (
"sync"
"unsafe"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontainer/list"
)
......@@ -91,7 +93,7 @@ type revCacheEntry struct {
head Tid
// loading result: object (buf, serial) or error
buf *Buf
buf *mem.Buf
serial Tid
err error
......@@ -113,7 +115,7 @@ type revCacheEntry struct {
// StorLoader represents loading part of a storage.
// XXX -> zodb.IStorageLoader (or zodb.Loader ?) ?
type StorLoader interface {
Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error)
Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error)
}
// lock order: Cache.mu > oidCacheEntry
......@@ -146,7 +148,7 @@ func (c *Cache) SetSizeMax(sizeMax int) {
// Load loads data from database via cache.
//
// If data is already in cache - cached content is returned.
func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error) {
func (c *Cache) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
rce, rceNew := c.lookupRCE(xid, +1)
// rce is already in cache - use it
......
......@@ -30,6 +30,8 @@ import (
"sync/atomic"
"testing"
"lab.nexedi.com/kirr/go123/mem"
"github.com/kylelemons/godebug/pretty"
)
......@@ -47,8 +49,8 @@ type tOidData struct {
}
// create new buffer with specified content copied there.
func mkbuf(data []byte) *Buf {
buf := BufAlloc(len(data))
func mkbuf(data []byte) *mem.Buf {
buf := mem.BufAlloc(len(data))
copy(buf.Data, data)
return buf
}
......@@ -56,7 +58,7 @@ func mkbuf(data []byte) *Buf {
// check whether buffers hold same data or both are nil.
//
// NOTE we ignore refcnt here
func bufSame(buf1, buf2 *Buf) bool {
func bufSame(buf1, buf2 *mem.Buf) bool {
if buf1 == nil {
return (buf2 == nil)
}
......@@ -64,7 +66,7 @@ func bufSame(buf1, buf2 *Buf) bool {
return reflect.DeepEqual(buf1.Data, buf2.Data)
}
func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *Buf, serial Tid, err error) {
func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
//fmt.Printf("> load(%v)\n", xid)
//defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }()
......@@ -133,7 +135,7 @@ func TestCache(t *testing.T) {
c := NewCache(tstor, 100 /* > Σ all data */)
ctx := context.Background()
checkLoad := func(xid Xid, buf *Buf, serial Tid, err error) {
checkLoad := func(xid Xid, buf *mem.Buf, serial Tid, err error) {
t.Helper()
bad := &bytes.Buffer{}
b, s, e := c.Load(ctx, xid)
......@@ -169,7 +171,7 @@ func TestCache(t *testing.T) {
}
}
checkRCE := func(rce *revCacheEntry, head, serial Tid, buf *Buf, err error) {
checkRCE := func(rce *revCacheEntry, head, serial Tid, buf *mem.Buf, err error) {
t.Helper()
bad := &bytes.Buffer{}
if rce.head != head {
......@@ -618,7 +620,7 @@ func (c *Checker) assertEq(a, b interface{}) {
type noopStorage struct {}
var noopData = []byte{0}
func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *Buf, serial Tid, err error) {
func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
return mkbuf(noopData), 1, nil
}
......
......@@ -25,6 +25,8 @@ import (
"fmt"
"net/url"
"strings"
"lab.nexedi.com/kirr/go123/mem"
)
// OpenOptions describes options for OpenStorage
......@@ -101,7 +103,7 @@ type storage struct {
// loading always goes through cache - this way prefetching can work
func (s *storage) Load(ctx context.Context, xid Xid) (*Buf, Tid, error) {
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
return s.l1cache.Load(ctx, xid)
}
......
......@@ -75,6 +75,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
)
......@@ -135,7 +136,7 @@ func (dh *DataHeader) Free() {
dhPool.Put(dh)
}
func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) {
func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction which changed this oid
dataPos, ok := fs.index.Get(xid.Oid)
if !ok {
......@@ -156,7 +157,7 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, ser
return buf, serial, err
}
func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*zodb.Buf, zodb.Tid, error) {
func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
// search backwards for when we first have data record with tid satisfying xid.At
for {
err := dh.LoadPrevRev(fs.file)
......@@ -210,7 +211,7 @@ type zIter struct {
dhLoading DataHeader
datai zodb.DataInfo // ptr to this will be returned by .NextData
dataBuf *zodb.Buf
dataBuf *mem.Buf
}
type zIterFlags int
......
......@@ -30,6 +30,7 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xbytes"
)
......@@ -658,20 +659,20 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
//
// NOTE on success dh state is changed to data header of original data transaction.
// NOTE "deleted" records are indicated via returning buf with .Data=nil without error.
func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) {
// scan via backpointers
for dh.DataLen == 0 {
err := dh.LoadBack(r)
if err != nil {
if err == io.EOF {
return &zodb.Buf{Data: nil}, nil // deleted
return &mem.Buf{Data: nil}, nil // deleted
}
return nil, err
}
}
// now read actual data
buf := zodb.BufAlloc64(dh.DataLen)
buf := mem.BufAlloc64(dh.DataLen)
_, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize)
if err != nil {
buf.Release()
......
......@@ -28,6 +28,8 @@ package zodb
import (
"context"
"fmt"
"lab.nexedi.com/kirr/go123/mem"
)
// ---- data model ----
......@@ -192,7 +194,7 @@ type IStorageDriver interface {
// suffice.
//
// XXX zodb.loadBefore() returns (data, serial, serial_next) -> add serial_next?
Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error)
Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error)
// TODO: write mode
......
......@@ -81,7 +81,6 @@ var _LF = []byte{'\n'}
// DumpData dumps one data record
// XXX naming -> DumpObj ?
func (d *dumper) DumpData(datai *zodb.DataInfo) error {
buf := &d.buf
buf.Reset()
......
......@@ -36,7 +36,6 @@ import (
type paramFunc func(ctx context.Context, stor zodb.IStorage) (string, error)
var infov = []struct {name string; getParam paramFunc} {
// XXX e.g. stor.LastTid() should return err itself
{"name", func(ctx context.Context, stor zodb.IStorage) (string, error) {
return stor.URL(), nil
}},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment