Commit 7e9cea77 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bcb84a06
...@@ -38,7 +38,7 @@ import ( ...@@ -38,7 +38,7 @@ import (
// Cache provides RAM caching layer that can be used over a storage. // Cache provides RAM caching layer that can be used over a storage.
type Cache struct { type Cache struct {
loader interface { loader interface {
StorLoader Loader
URL() string URL() string
} }
...@@ -111,12 +111,6 @@ type revCacheEntry struct { ...@@ -111,12 +111,6 @@ type revCacheEntry struct {
waitBufRef int32 waitBufRef int32
} }
// StorLoader represents loading part of a storage.
// XXX -> zodb.IStorageLoader (or zodb.Loader ?) ?
type StorLoader interface {
Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error)
}
// lock order: Cache.mu > oidCacheEntry // lock order: Cache.mu > oidCacheEntry
// Cache.gcMu > oidCacheEntry // Cache.gcMu > oidCacheEntry
...@@ -124,7 +118,7 @@ type StorLoader interface { ...@@ -124,7 +118,7 @@ type StorLoader interface {
// NewCache creates new cache backed up by loader. // NewCache creates new cache backed up by loader.
// //
// The cache will use not more than ~ sizeMax bytes of RAM for cached data. // The cache will use not more than ~ sizeMax bytes of RAM for cached data.
func NewCache(loader interface { StorLoader; URL() string }, sizeMax int) *Cache { func NewCache(loader interface { Loader; URL() string }, sizeMax int) *Cache {
c := &Cache{ c := &Cache{
loader: loader, loader: loader,
entryMap: make(map[Oid]*oidCacheEntry), entryMap: make(map[Oid]*oidCacheEntry),
......
...@@ -636,7 +636,7 @@ func (c *Checker) assertEq(a, b interface{}) { ...@@ -636,7 +636,7 @@ func (c *Checker) assertEq(a, b interface{}) {
// ---------------------------------------- // ----------------------------------------
// noopStorage is dummy StorLoader which for any oid/xid always returns 1-byte data // noopStorage is dummy Loader which for any oid/xid always returns 1-byte data.
type noopStorage struct{} type noopStorage struct{}
var noopData = []byte{0} var noopData = []byte{0}
...@@ -648,15 +648,15 @@ func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid ...@@ -648,15 +648,15 @@ func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid
return mkbuf(noopData), 1, nil return mkbuf(noopData), 1, nil
} }
// benchLoad serially benchmarks a StorLoader - either storage directly or a cache on top of it // benchLoad serially benchmarks a Loader - either storage directly or a cache on top of it.
// //
// oid accessed are [0, worksize) // oid accessed are [0, worksize)
func benchLoad(b *testing.B, l StorLoader, worksize int) { func benchLoad(b *testing.B, l Loader, worksize int) {
benchLoadN(b, b.N, l, worksize) benchLoadN(b, b.N, l, worksize)
} }
// worker for benchLoad, with n overridding b.N // worker for benchLoad, with n overridding b.N
func benchLoadN(b *testing.B, n int, l StorLoader, worksize int) { func benchLoadN(b *testing.B, n int, l Loader, worksize int) {
ctx := context.Background() ctx := context.Background()
xid := Xid{At: 1, Oid: 0} xid := Xid{At: 1, Oid: 0}
...@@ -723,7 +723,7 @@ func BenchmarkCacheHit(b *testing.B) { ...@@ -723,7 +723,7 @@ func BenchmarkCacheHit(b *testing.B) {
// ---- parallel benchmarks (many requests to 1 cache) ---- // ---- parallel benchmarks (many requests to 1 cache) ----
// benchLoadPar is like benchLoad but issues loads in parallel // benchLoadPar is like benchLoad but issues loads in parallel
func benchLoadPar(b *testing.B, l StorLoader, worksize int) { func benchLoadPar(b *testing.B, l Loader, worksize int) {
ctx := context.Background() ctx := context.Background()
np := runtime.GOMAXPROCS(0) np := runtime.GOMAXPROCS(0)
p := uint64(0) p := uint64(0)
...@@ -776,7 +776,7 @@ func BenchmarkCacheHitPar(b *testing.B) { ...@@ -776,7 +776,7 @@ func BenchmarkCacheHitPar(b *testing.B) {
// XXX this benchmark part will probably go away // XXX this benchmark part will probably go away
// benchLoadProc is like benchLoad but works with PB, not B // benchLoadProc is like benchLoad but works with PB, not B
func benchLoadProc(pb *testing.PB, l StorLoader, worksize int) error { func benchLoadProc(pb *testing.PB, l Loader, worksize int) error {
ctx := context.Background() ctx := context.Background()
xid := Xid{At: 1, Oid: 0} xid := Xid{At: 1, Oid: 0}
......
...@@ -22,6 +22,7 @@ package zodb ...@@ -22,6 +22,7 @@ package zodb
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
pickle "github.com/kisielk/og-rek" pickle "github.com/kisielk/og-rek"
...@@ -35,8 +36,60 @@ import ( ...@@ -35,8 +36,60 @@ import (
// https://github.com/zopefoundation/ZODB/blob/a89485c1/src/ZODB/serialize.py // https://github.com/zopefoundation/ZODB/blob/a89485c1/src/ZODB/serialize.py
// //
// for format description. // for format description.
//
// PyData can be decoded into PyObject.
type PyData []byte type PyData []byte
//type PyClass struct {
// Module string
// Name string
//}
// XXX + String = Module + "." + Name
// PyObject represents persistent Python object.
//
// PyObject can be decoded from PyData.
type PyObject struct {
//oid Oid
//serial Tid
// XXX + Oid, Serial ? (_p_oid, _p_serial)
pyClass pickle.Class // python class of this object XXX -> ro
State interface{} // object state. python passes this to pyclass.__new__().__setstate__()
}
// PyLoader is like Loader by returns decoded Python objects instead of raw data.
type PyLoader interface {
// XXX returned pyobject, contrary to Loader, can be modified, because
// it is not shared. right?
Load(ctx, xid Xid) (*PyObject, error)
}
// Decode decodes raw ZODB python data into PyObject. XXX -> (pyclass, pystate)
//func (d PyData) Decode() (*PyObject, error) {
func (d PyData) Decode() (pyclass PyClass, pystate interface{}, error) {
p := pickle.NewDecoder(bytes.NewReader([]byte(d)))
xklass, err := p.Decode()
if err != nil {
return nil, fmt.Errorf("pydata: decode: class description: %s", err)
}
klass, err := normPyClass(xklass)
if err != nil {
return nil, fmt.Errorf("pydata: decode: class description: %s", err)
}
state, err := p.Decode()
if err != nil {
return nil, fmt.Errorf("pydata: decode: object state: %s", err)
}
return &PyObject{pyClass: klass, State: state}, nil
}
// ClassName returns fully-qualified python class name used for object type. // ClassName returns fully-qualified python class name used for object type.
// //
// The format is "module.class". // The format is "module.class".
...@@ -49,23 +102,55 @@ func (d PyData) ClassName() string { ...@@ -49,23 +102,55 @@ func (d PyData) ClassName() string {
return "?.?" return "?.?"
} }
klass, err := normPyClass(xklass)
if err != nil {
return "?.?"
}
return klass.Module + "." + klass.Name
}
var errInvalidPyClass = errors.New("invalid py class description")
// normPyClass normalizes py class that has just been decoded from a serialized
// ZODB object or reference.
func normPyClass(xklass interface{}) (pickle.Class, error) {
// class description:
//
// - type(obj), or
// - (xklass, newargs|None) ; xklass = type(obj) | (modname, classname)
if t, ok := xklass.(pickle.Tuple); ok { if t, ok := xklass.(pickle.Tuple); ok {
if len(t) != 2 { // (klass, args) // t = (xklass, newargs|None)
return "?.?" if len(t) != 2 {
return pickle.Class{}, errInvalidPyClass
} }
// XXX newargs is ignored (zodb/py uses it only for persistent classes)
xklass = t[0] xklass = t[0]
if t, ok := xklass.(pickle.Tuple); ok { if t, ok := xklass.(pickle.Tuple); ok {
// py: "old style reference" // t = (modname, classname)
if len(t) != 2 { if len(t) != 2 {
return "?.?" // (modname, classname) return pickle.Class{}, errInvalidPyClass
} }
return fmt.Sprintf("%s.%s", t...) modname, ok1 := t[0].(string)
classname, ok2 := t[1].(string)
if !(ok1 && ok2) {
return pickle.Class{}, errInvalidPyClass
}
return pickle.Class{Module: modname, Name: classname}, nil
} }
} }
if klass, ok := xklass.(pickle.Class); ok { if klass, ok := xklass.(pickle.Class); ok {
return klass.Module + "." + klass.Name // klass = type(obj)
return klass, nil
} }
return "?.?" return pickle.Class{}, errInvalidPyClass
}
// PyClass returns Python class of the object.
func (pyobj *PyObject) PyClass() pickle.Class {
return pyobj.pyClass
} }
...@@ -30,6 +30,8 @@ type _PyDataClassName_TestEntry struct { ...@@ -30,6 +30,8 @@ type _PyDataClassName_TestEntry struct {
className string className string
} }
// XXX + test with zodbpickle.binary (see 12ee41c4 in ZODB)
func TestPyClassName(t *testing.T) { func TestPyClassName(t *testing.T) {
for _, tt := range _PyData_ClassName_Testv { for _, tt := range _PyData_ClassName_Testv {
className := PyData(tt.pydata).ClassName() className := PyData(tt.pydata).ClassName()
...@@ -39,3 +41,7 @@ func TestPyClassName(t *testing.T) { ...@@ -39,3 +41,7 @@ func TestPyClassName(t *testing.T) {
} }
} }
} }
func TestPyDecode(t *testing.T) {
// XXX
}
...@@ -55,6 +55,15 @@ ...@@ -55,6 +55,15 @@
// documentation of IStorage, and other interfaces it embeds, for details. // documentation of IStorage, and other interfaces it embeds, for details.
// //
// //
// Python data
//
// PyData, PyObject, ...
//
//
// Storage drivers
//
// IStorageDriver, RegisterDriver + wks (FileStorage, ZEO and NEO).
//
// -------- // --------
// //
// See also package lab.nexedi.com/kirr/neo/go/zodb/zodbtools and associated // See also package lab.nexedi.com/kirr/neo/go/zodb/zodbtools and associated
...@@ -208,6 +217,13 @@ func (e *OpError) Cause() error { ...@@ -208,6 +217,13 @@ func (e *OpError) Cause() error {
type IStorage interface { type IStorage interface {
IStorageDriver IStorageDriver
//Loader
Prefetcher
//Iterator
}
// XXX
type Prefetcher interface {
// Prefetch prefetches object addressed by xid. // Prefetch prefetches object addressed by xid.
// //
// If data is not yet in cache loading for it is started in the background. // If data is not yet in cache loading for it is started in the background.
...@@ -232,6 +248,12 @@ type IStorageDriver interface { ...@@ -232,6 +248,12 @@ type IStorageDriver interface {
// If no transactions have been committed yet, LastTid returns 0. // If no transactions have been committed yet, LastTid returns 0.
LastTid(ctx context.Context) (Tid, error) LastTid(ctx context.Context) (Tid, error)
Loader
Iterator
}
// Loader exposes functionality to load objects.
type Loader interface {
// Load loads object data addressed by xid from database. // Load loads object data addressed by xid from database.
// //
// Returned are: // Returned are:
...@@ -239,6 +261,8 @@ type IStorageDriver interface { ...@@ -239,6 +261,8 @@ type IStorageDriver interface {
// - if there is data to load: buf is non-empty, serial indicates // - if there is data to load: buf is non-empty, serial indicates
// transaction which matched xid criteria and err=nil. // transaction which matched xid criteria and err=nil.
// //
// caller must not modify buf memory.
//
// otherwise buf=nil, serial=0 and err is *OpError with err.Err // otherwise buf=nil, serial=0 and err is *OpError with err.Err
// describing the error cause: // describing the error cause:
// //
...@@ -273,7 +297,10 @@ type IStorageDriver interface { ...@@ -273,7 +297,10 @@ type IStorageDriver interface {
// cache without serial_next returned from Load. For this reason in ZODB/go // cache without serial_next returned from Load. For this reason in ZODB/go
// Load specification comes without specifying serial_next return. // Load specification comes without specifying serial_next return.
Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error)
}
// Committer exposes functionality to commit transactions.
type Committer interface {
// TODO: write mode // TODO: write mode
// Store(ctx, oid Oid, serial Tid, data []byte, txn ITransaction) error // Store(ctx, oid Oid, serial Tid, data []byte, txn ITransaction) error
...@@ -283,12 +310,19 @@ type IStorageDriver interface { ...@@ -283,12 +310,19 @@ type IStorageDriver interface {
// TpcVote(txn) // TpcVote(txn)
// TpcFinish(txn, callback) // TpcFinish(txn, callback)
// TpcAbort(txn) // TpcAbort(txn)
}
// Notifier allows to be notified of database changes made by other clients.
type Notifier interface {
// TODO: invalidation channel (notify about changes made to DB not by us from outside) // TODO: invalidation channel (notify about changes made to DB not by us from outside)
}
// TODO: History(ctx, oid, size=1) // TODO: History(ctx, oid, size=1)
// Iterator provides functionality to iterate through storage transactions sequentially.
type Iterator interface {
// Iterate creates iterator to iterate storage in [tidMin, tidMax] range. // Iterate creates iterator to iterate storage in [tidMin, tidMax] range.
// //
// Iterate does not return any error. If there was error when setting // Iterate does not return any error. If there was error when setting
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment