Commit a00572f6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5d928bd1
......@@ -74,7 +74,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// NewClient creates new client node.
//
// It will connect to master @masterAddr and identify with sepcified cluster name.
// It will connect to master @masterAddr and identify with specified cluster name.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
cli := &Client{
node: neo.NewNodeApp(net, neo.CLIENT, clusterName, masterAddr, ""),
......@@ -479,7 +479,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e
return buf, resp.Serial, nil
}
func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
func (c *Client) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
// see notes in ../NOTES:"On iteration"
panic("TODO")
}
......@@ -511,7 +511,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo
func (c *Client) URL() string {
// XXX neos:// depending whether it was tls
// XXX options if such were given to open are discarded
// (but we need to be able to contruct URL if Client was created via NewClient directly)
// (but we need to be able to construct URL if Client was created via NewClient directly)
return fmt.Sprintf("neo://%s@%s", c.node.ClusterName, c.node.MasterAddr)
}
......
......@@ -72,7 +72,7 @@ type NodeLink struct {
axdownFlag atomic32 // 1 when AX is marked no longer operational
// axdown chan struct{} // ready when accept is marked as no longer operational
axdown1 sync.Once // CloseAccept may be called severall times
axdown1 sync.Once // CloseAccept may be called several times
down chan struct{} // ready when NodeLink is marked as no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error
......@@ -478,7 +478,7 @@ func (c *Conn) downRX(errMsg *Error) {
for {
// we set .rxdownFlag=1 above.
// now if serveRecv is outside `.rxq <- ...` critical section we know it is either:
// - before it -> it will eventually see .rxdownFlag=1 and wont send pkt ro rxq.
// - before it -> it will eventually see .rxdownFlag=1 and won't send pkt to rxq.
// - after it -> it already sent pkt to rxq and won't touch
// rxq until next packet (where it will hit "before it").
//
......@@ -509,7 +509,7 @@ func (c *Conn) downRX(errMsg *Error) {
// now if recvPkt is outside `... <- .rxq` critical section we know that it is either:
// - before it -> it will eventually see .rxdownFlag=1 and won't try to read rxq.
// - after it -> it already read pktfrom rxq and won't touch
// rxq until next recvPkt (where it will het "before it").
// rxq until next recvPkt (where it will hit "before it").
if c.rxqRead.Get() == 0 {
break
}
......@@ -821,7 +821,7 @@ func (nl *NodeLink) serveRecv() {
// on rxq/acceptq only being put into the runqueue of current proc.
// By default proc runq will execute only when sendRecv blocks
// again next time deep in nl.recvPkt(), but let's force the switch
// now without additional wating to reduce latency.
// now without additional waiting to reduce latency.
// XXX bad - puts serveRecv to global runq thus with high p to switch M
//runtime.Gosched()
......@@ -1602,7 +1602,7 @@ func (c *Conn) Ask(req Msg, resp Msg) error {
}
// ---- exchange of 1-1 request-reply ----
// (impedance matcher for current neo/py imlementation)
// (impedance matcher for current neo/py implementation)
// lightClose closes light connection.
//
......
......@@ -514,7 +514,7 @@ func TestMasterStorage(t *testing.T) {
// C loads every other {<,=}serial:oid - established link is reused
ziter := zstor.Iterate(0, zodb.TidMax)
ziter := zstor.Iterate(bg, 0, zodb.TidMax)
// XXX hack: disable tracing early so that C.Load() calls do not deadlock
// TODO refactor cluster creation into func
......
......@@ -49,15 +49,14 @@ type Cache struct {
// cache is fully synchronized with storage for transactions with tid <= head.
// XXX clarify ^^^ (it means if revCacheEntry.head=∞ it is Cache.head)
head Tid
entryMap map[Oid]*oidCacheEntry // oid -> oid's cache entries
head Tid
gcMu sync.Mutex
lru lruHead // revCacheEntries in LRU order
size int // cached data size in bytes
entryMap map[Oid]*oidCacheEntry // oid -> oid's cache entries
sizeMax int // cache is allowed to occupy not more than this
gcMu sync.Mutex
lru lruHead // revCacheEntries in LRU order
size int // cached data size in bytes
sizeMax int // cache is allowed to occupy not more than this
}
// oidCacheEntry maintains cached revisions for 1 oid
......@@ -76,8 +75,8 @@ type oidCacheEntry struct {
// revCacheEntry is information about 1 cached oid revision
type revCacheEntry struct {
parent *oidCacheEntry // oidCacheEntry holding us
inLRU lruHead // in Cache.lru; protected by Cache.gcMu
parent *oidCacheEntry // oidCacheEntry holding us
inLRU lruHead // in Cache.lru; protected by Cache.gcMu
// we know that load(oid, .head) will give this .serial:oid.
//
......@@ -107,7 +106,7 @@ type revCacheEntry struct {
// protected by .parent's lock:
accounted bool // whether rce size was accounted in cache size
accounted bool // whether rce size was accounted in cache size
// how many waiters for buf is there while rce is being loaded.
// after data for this RCE is loaded loadRCE will do .buf.XIncref() .waitBufRef times.
......@@ -163,8 +162,6 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, er
// rce is not in cache - this goroutine becomes responsible for loading it
} else {
// XXX use connection poll
// XXX or it should be cared by loader?
c.loadRCE(ctx, rce)
}
......@@ -192,7 +189,6 @@ func (c *Cache) Prefetch(ctx context.Context, xid Xid) {
// spawn loading in the background if rce was not yet loaded
if rceNew {
// XXX use connection poll
go c.loadRCE(ctx, rce)
}
......@@ -305,7 +301,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry) {
// normalize buf/serial if it was error
if err != nil {
e := err.(*OpError) // XXX better driver return *OpError explicitly
e := err.(*OpError) // XXX better driver return *OpError explicitly
// only remember problem cause - full OpError will be
// reconstructed in Load with actual requested there xid.
......@@ -362,7 +358,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry) {
// and will update lru and cache size for it itself.
rceOrig := rce
rceDropped := false
if i + 1 < len(oce.rcev) {
if i+1 < len(oce.rcev) {
rceNext := oce.rcev[i+1]
if rceNext.loaded() && tryMerge(rce, rceNext, rce) {
// not δsize -= len(rce.buf.Data)
......@@ -544,7 +540,7 @@ func (c *Cache) gc() {
}
// freelist(OCE)
var ocePool = sync.Pool{New: func() interface{} { return &oidCacheEntry{} } }
var ocePool = sync.Pool{New: func() interface{} { return &oidCacheEntry{} }}
// oceAlloc allocates oidCacheEntry from freelist.
func oceAlloc(oid Oid) *oidCacheEntry {
......@@ -663,7 +659,7 @@ func (h *lruHead) rceFromInLRU() (rce *revCacheEntry) {
// errDB returns error about database being inconsistent
func errDB(oid Oid, format string, argv ...interface{}) error {
// XXX -> separate type?
return fmt.Errorf("cache: database inconsistency: oid: %v: " + format,
return fmt.Errorf("cache: database inconsistency: oid: %v: "+format,
append([]interface{}{oid}, argv...)...)
}
......
......@@ -45,7 +45,7 @@ type tStorage struct {
type tOidData struct {
serial Tid
data []byte
err error // e.g. io error
err error // e.g. io error
}
// create new buffer with specified content copied there.
......@@ -208,7 +208,7 @@ func TestCache(t *testing.T) {
}
if bad.Len() != 0 {
t.Fatalf("rce:\n%s", bad.Bytes()) // XXX add oid?
t.Fatalf("rce:\n%s", bad.Bytes()) // XXX add oid?
}
}
......@@ -466,7 +466,7 @@ func TestCache(t *testing.T) {
fmt.Fprintf(bad, "unexpected rce found:\n%s\n", pretty.Compare(expect, rce))
}
if bad.Len() != 0{
if bad.Len() != 0 {
t.Fatalf("lookupRCE(%v):\n%s", xid, bad.Bytes())
}
}
......@@ -637,7 +637,7 @@ func (c *Checker) assertEq(a, b interface{}) {
// ----------------------------------------
// noopStorage is dummy StorLoader which for any oid/xid always returns 1-byte data
type noopStorage struct {}
type noopStorage struct{}
var noopData = []byte{0}
func (s *noopStorage) URL() string {
......@@ -676,7 +676,7 @@ func benchLoadN(b *testing.B, n int, l StorLoader, worksize int) {
// benchmark storage under cache
func BenchmarkNoopStorage(b *testing.B) { benchLoad(b, &noopStorage{}, b.N /* = ∞ */) }
func BenchmarkNoopStorage(b *testing.B) { benchLoad(b, &noopStorage{}, b.N /* = ∞ */) }
// cache sizes to benchmark (elements = bytes (we are using 1-byte element))
var cachesizev = []int{0, 16, 128, 512, 4096}
......@@ -728,9 +728,9 @@ func benchLoadPar(b *testing.B, l StorLoader, worksize int) {
np := runtime.GOMAXPROCS(0)
p := uint64(0)
b.RunParallel(func (pb *testing.PB) {
b.RunParallel(func(pb *testing.PB) {
oid0 := Oid(atomic.AddUint64(&p, +1)) // all workers start/iterate at different oid
xid := Xid{At: 1, Oid: oid0 }
xid := Xid{At: 1, Oid: oid0}
for pb.Next() {
buf, _, err := l.Load(ctx, xid)
if err != nil {
......@@ -746,7 +746,7 @@ func benchLoadPar(b *testing.B, l StorLoader, worksize int) {
})
}
func BenchmarkNoopStoragePar(b *testing.B) { benchLoadPar(b, &noopStorage{}, b.N /* = ∞ */) }
func BenchmarkNoopStoragePar(b *testing.B) { benchLoadPar(b, &noopStorage{}, b.N /* = ∞ */) }
func BenchmarkCacheStartupPar(b *testing.B) {
s := &noopStorage{}
......@@ -797,7 +797,7 @@ func benchLoadProc(pb *testing.PB, l StorLoader, worksize int) error {
}
func BenchmarkNoopStorageProc(b *testing.B) {
b.RunParallel(func (pb *testing.PB) {
b.RunParallel(func(pb *testing.PB) {
s := &noopStorage{}
err := benchLoadProc(pb, s, b.N)
if err != nil {
......@@ -807,7 +807,7 @@ func BenchmarkNoopStorageProc(b *testing.B) {
}
func BenchmarkCacheStartupProc(b *testing.B) {
b.RunParallel(func (pb *testing.PB) {
b.RunParallel(func(pb *testing.PB) {
s := &noopStorage{}
c := NewCache(s, b.N)
err := benchLoadProc(pb, c, b.N)
......@@ -821,7 +821,7 @@ func BenchmarkCacheStartupProc(b *testing.B) {
func benchEachCacheProc(b *testing.B, f func(b *testing.B, pb *testing.PB, c *Cache) error) {
for _, size := range cachesizev {
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {
b.RunParallel(func (pb *testing.PB) {
b.RunParallel(func(pb *testing.PB) {
s := &noopStorage{}
c := NewCache(s, size)
err := f(b, pb, c)
......
......@@ -52,7 +52,7 @@
// https://github.com/zopefoundation/ZODB/blob/a89485c1/src/ZODB/fsIndex.py
// https://github.com/zopefoundation/ZODB/commit/1bb14faf
//
// Unless one is doing something FileStorage-specific, it is adviced not to use
// Unless one is doing something FileStorage-specific, it is advised not to use
// fs1 package directly, and instead link-in lab.nexedi.com/kirr/neo/go/zodb/wks,
// open storage by zodb.OpenStorage and use it by way of zodb.IStorage interface.
//
......@@ -131,7 +131,7 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, seri
// FIXME zodb.TidMax is only 7fff... tid from outside can be ffff...
// -> TODO reject tid out of range
// FIXME kill Load_XXXWithNextSerial after neo/py cache does not depend on next_serial
// FIXME kill Load_XXXWithNextSerialXXX after neo/py cache does not depend on next_serial
buf, serial, _, err = fs.Load_XXXWithNextSerialXXX(nil, xid)
return buf, serial, err
}
......@@ -385,7 +385,7 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
}
// Iterate creates zodb-level iterator for tidMin..tidMax range
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
// when iterating use IO optimized for sequential access
fsSeq := xbufio.NewSeqReaderAt(fs.file)
ziter := &zIter{iter: Iter{R: fsSeq}}
......@@ -403,7 +403,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
URL: fs.URL(),
Op: "iter",
// XXX (?) add TidRange type which prints as
// "tidmin..tidmax" with omiting ends if it is either 0 or ∞
// "tidmin..tidmax" with omitting ends if it is either 0 or ∞
Args: []zodb.Tid{tidMin, tidMax},
Err: err,
}}
......
......@@ -157,7 +157,7 @@ func TestLoad(t *testing.T) {
// iterate tidMin..tidMax and expect db entries in expectv
func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv []dbEntry) {
ctx := context.Background()
iter := fs.Iterate(tidMin, tidMax)
iter := fs.Iterate(ctx, tidMin, tidMax)
fsi, ok := iter.(*zIter)
if !ok {
_, _, err := iter.NextTxn(ctx)
......@@ -233,7 +233,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
txe := dbe.Entryv[kdata]
dh := txe.Header
// assert datai pointes to where we expect - this will allow us
// assert datai points to where we expect - this will allow us
// not only to check oid/tid/data but also to check whole data header.
if datai != &fsi.datai {
t.Fatal("unexpected datai pointer")
......@@ -309,7 +309,7 @@ func BenchmarkIterate(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter := fs.Iterate(zodb.Tid(0), zodb.TidMax)
iter := fs.Iterate(ctx, zodb.Tid(0), zodb.TidMax)
for {
txni, dataIter, err := iter.NextTxn(ctx)
......
......@@ -21,9 +21,9 @@ package zodb
// formatting and parsing for basic zodb types
import (
"fmt"
"encoding/hex"
"encoding/binary"
"encoding/hex"
"fmt"
"lab.nexedi.com/kirr/go123/xfmt"
"lab.nexedi.com/kirr/go123/xstrings"
......@@ -87,7 +87,7 @@ func (xid Xid) XFmtString(b xfmt.Buffer) xfmt.Buffer {
func parseHex64(subj, s string) (uint64, error) {
// XXX -> xfmt ?
// XXX like scanf("%016x") but scanf implicitly skips spaces without giving control to caller and is slower
var b[8]byte
var b [8]byte
if len(s) != 16 {
return 0, fmt.Errorf("%s %q invalid", subj, s)
}
......
......@@ -88,8 +88,7 @@ type TxnInfo struct {
Extension []byte
}
// DataInfo represents information about one object change.
// DataInfo is information about one object change.
type DataInfo struct {
Oid Oid
Tid Tid // changed by this transaction
......@@ -152,7 +151,7 @@ type OpError struct {
URL string // URL of the storage
Op string // operation that failed
Args interface{} // operation arguments, if any
Err error // actual error that occured during the operation
Err error // actual error that occurred during the operation
}
func (e *OpError) Error() string {
......@@ -169,7 +168,6 @@ func (e *OpError) Cause() error {
}
// IStorage is the interface provided by opened ZODB storage
type IStorage interface {
IStorageDriver
......@@ -256,8 +254,11 @@ type IStorageDriver interface {
// Iterate creates iterator to iterate storage in [tidMin, tidMax] range.
//
// Iterate does not return any error. If there was error when setting
// iteration up - it will be returned on first NextTxn call.
//
// TODO allow iteration both ways (forward & backward)
Iterate(tidMin, tidMax Tid) ITxnIterator // XXX ctx , error ?
Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator
}
// ITxnIterator is the interface to iterate transactions.
......
......@@ -58,7 +58,6 @@ func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid,
return err
}
// XXX hack - TODO rework IStorage.Load to fill-in objInfo directly?
objInfo.Oid = xid.Oid
objInfo.Tid = serial
objInfo.Data = buf.Data
......
......@@ -193,7 +193,7 @@ func (d *dumper) Dump(ctx context.Context, stor zodb.IStorage, tidMin, tidMax zo
var dataIter zodb.IDataIterator
var err error
iter := stor.Iterate(tidMin, tidMax)
iter := stor.Iterate(ctx, tidMin, tidMax)
// transactions
for {
......
......@@ -45,7 +45,7 @@ func loadZdumpPy(t *testing.T, path string) string {
t.Fatal(err)
}
// python qoutes "\v" as "\x0b", go as "\v"; same for "\f", "\a", "\b".
// python quotes "\v" as "\x0b", go as "\v"; same for "\f", "\a", "\b".
// XXX this is a bit hacky. We could compare quoted strings as decoded,
// but this would need zdump format parser which could contain other
// bugs. Here we want to compare output ideally bit-to-bit but those
......
......@@ -126,6 +126,7 @@ func infoMain(argv []string) {
if err != nil {
prog.Fatal(err)
}
// TODO defer stor.Close()
err = Info(ctx, os.Stdout, stor, argv[1:])
if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment