Commit 8fa9fdaf authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/fs1: Index save/load

Build index type on top of fsb.Tree introduced in the previous patch and
add routines to save and load it to/from disk.

We ensure ZODB/py compatibility via generating test FileStorage database
+ its index and checking we can load index from it and also that if we
save an index ZODB/py can load it back. FileStorage index is hard to get
bit-to-bit identical since this index uses python pickles which can
encode the same objects in several different ways.
parent 33d10066
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package fs1
// index for quickly finding oid -> oid's latest data record
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"path/filepath"
"strconv"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1/fsb"
pickle "github.com/kisielk/og-rek"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xbufio"
"lab.nexedi.com/kirr/go123/xerr"
)
// Index is in-RAM Oid -> Data record position mapping used to associate Oid
// with Data record in latest transaction which changed it.
type Index struct {
// this index covers data file up to < .TopPos
// usually for whole-file index TopPos is position pointing just past
// the last committed transaction.
TopPos int64
*fsb.Tree
}
// IndexNew creates new empty index.
func IndexNew() *Index {
return &Index{TopPos: txnValidFrom, Tree: fsb.TreeNew()}
}
// NOTE Get/Set/... are taken as-is from fsb.Tree
// --- index load/save ---
// on-disk index format
// (changed in 2010 in https://github.com/zopefoundation/ZODB/commit/1bb14faf)
//
// TopPos
// (oid[:6], fsBucket)
// (oid[:6], fsBucket)
// ...
// None
//
//
// fsBucket:
// oid[6:8]oid[6:8]oid[6:8]...pos[2:8]pos[2:8]pos[2:8]...
const (
oidPrefixMask zodb.Oid = (1<<64 - 1) ^ (1<<16 - 1) // 0xffffffffffff0000
posInvalidMask uint64 = (1<<64 - 1) ^ (1<<48 - 1) // 0xffff000000000000
posValidMask uint64 = 1<<48 - 1 // 0x0000ffffffffffff
)
// IndexSaveError is the error type returned by index save routines
type IndexSaveError struct {
Err error // error that occurred during the operation
}
func (e *IndexSaveError) Error() string {
return "index save: " + e.Err.Error()
}
// Save saves index to a writer
func (fsi *Index) Save(w io.Writer) (err error) {
defer func() {
if err == nil {
return
}
if _, ok := err.(*pickle.TypeError); ok {
panic(err) // all our types are expected to be supported by pickle
}
// otherwise it is an error returned by writer, which should already
// have filename & op as context.
err = &IndexSaveError{err}
}()
p := pickle.NewEncoder(w)
err = p.Encode(fsi.TopPos)
if err != nil {
return err
}
var oidb [8]byte
var posb [8]byte
var oidPrefixCur zodb.Oid // current oid[0:6] with [6:8] = 00
oidBuf := []byte{} // current oid[6:8]oid[6:8]...
posBuf := []byte{} // current pos[2:8]pos[2:8]...
var t [2]interface{} // tuple for (oid, fsBucket.toString())
e, _ := fsi.SeekFirst()
if e != nil {
defer e.Close()
for {
oid, pos, errStop := e.Next()
oidPrefix := oid & oidPrefixMask
if oidPrefix != oidPrefixCur || errStop != nil {
// emit (oid[0:6], oid[6:8]oid[6:8]...pos[2:8]pos[2:8]...)
binary.BigEndian.PutUint64(oidb[:], uint64(oidPrefixCur))
t[0] = oidb[0:6]
t[1] = bytes.Join([][]byte{oidBuf, posBuf}, nil)
err = p.Encode(pickle.Tuple(t[:]))
if err != nil {
return err
}
oidPrefixCur = oidPrefix
oidBuf = oidBuf[:0]
posBuf = posBuf[:0]
}
if errStop != nil {
break
}
// check pos does not overflow 6 bytes
if uint64(pos)&posInvalidMask != 0 {
return fmt.Errorf("entry position too large: 0x%x", pos)
}
binary.BigEndian.PutUint64(oidb[:], uint64(oid))
binary.BigEndian.PutUint64(posb[:], uint64(pos))
oidBuf = append(oidBuf, oidb[6:8]...)
posBuf = append(posBuf, posb[2:8]...)
}
}
err = p.Encode(pickle.None{})
return err
}
// SaveFile saves index to a file @ path.
//
// Index data is first saved to a temporary file and when complete the
// temporary is renamed to be at requested path. This way file @ path will be
// updated only with complete index data.
func (fsi *Index) SaveFile(path string) error {
dir, name := filepath.Dir(path), filepath.Base(path)
f, err := ioutil.TempFile(dir, name+".tmp")
if err != nil {
return &IndexSaveError{err}
}
// use buffering for f (ogórek does not buffer itself on encoding)
fb := bufio.NewWriter(f)
err1 := fsi.Save(fb)
err2 := fb.Flush()
err3 := f.Close()
if err1 != nil || err2 != nil || err3 != nil {
os.Remove(f.Name())
err = err1
if err == nil {
err = &IndexSaveError{xerr.First(err2, err3)}
}
return err
}
err = os.Rename(f.Name(), path)
if err != nil {
return &IndexSaveError{err}
}
return nil
}
// IndexLoadError is the error type returned by index load routines
type IndexLoadError struct {
Filename string // present if used IO object was with .Name()
Pos int64
Err error
}
func (e *IndexLoadError) Error() string {
s := "index load: "
if e.Filename != "" && e.Pos != -1 /* not yet got to decoding - .Err is ~ os.PathError */ {
s += e.Filename + ": "
}
if e.Pos != -1 {
s += "pickle @" + strconv.FormatInt(e.Pos, 10) + ": "
}
s += e.Err.Error()
return s
}
// xint64 tries to convert unpickled value to int64
func xint64(xv interface{}) (v int64, ok bool) {
switch v := xv.(type) {
case int64:
return v, true
case *big.Int:
if v.IsInt64() {
return v.Int64(), true
}
}
return 0, false
}
// LoadIndex loads index from a reader
func LoadIndex(r io.Reader) (fsi *Index, err error) {
var picklePos int64
defer func() {
if err != nil {
err = &IndexLoadError{ioname(r), picklePos, err}
}
}()
var ok bool
var xtopPos, xv interface{}
xr := xbufio.NewReader(r)
// by passing bufio.Reader directly we make sure it won't create one internally
p := pickle.NewDecoder(xr.Reader)
picklePos = xr.InputOffset()
xtopPos, err = p.Decode()
if err != nil {
return nil, err
}
topPos, ok := xint64(xtopPos)
if !ok {
return nil, fmt.Errorf("topPos is %T:%v (expected int64)", xtopPos, xtopPos)
}
fsi = IndexNew()
fsi.TopPos = topPos
var oidb [8]byte
loop:
for {
// load/decode next entry
var v pickle.Tuple
picklePos = xr.InputOffset()
xv, err = p.Decode()
if err != nil {
return nil, err
}
switch xv := xv.(type) {
default:
return nil, fmt.Errorf("invalid entry: type %T", xv)
case pickle.None:
break loop
// we accept tuple or list
case pickle.Tuple:
v = xv
case []interface{}:
v = pickle.Tuple(xv)
}
// unpack entry tuple -> oidPrefix, fsBucket
if len(v) != 2 {
return nil, fmt.Errorf("invalid entry: len = %d", len(v))
}
// decode oidPrefix
xoidPrefixStr := v[0]
oidPrefixStr, ok := xoidPrefixStr.(string)
if !ok {
return nil, fmt.Errorf("invalid oidPrefix: type %T", xoidPrefixStr)
}
if l := len(oidPrefixStr); l != 6 {
return nil, fmt.Errorf("invalid oidPrefix: len = %d", l)
}
copy(oidb[:], oidPrefixStr)
oidPrefix := zodb.Oid(binary.BigEndian.Uint64(oidb[:]))
// check fsBucket
xkvStr := v[1]
kvStr, ok := xkvStr.(string)
if !ok {
return nil, fmt.Errorf("invalid fsBucket: type %T", xkvStr)
}
if l := len(kvStr); l%8 != 0 {
return nil, fmt.Errorf("invalid fsBucket: len = %d", l)
}
// load btree from fsBucket entries
kvBuf := mem.Bytes(kvStr)
n := len(kvBuf) / 8
oidBuf := kvBuf[:n*2]
posBuf := kvBuf[n*2-2:] // NOTE starting 2 bytes behind
for i := 0; i < n; i++ {
oid := zodb.Oid(binary.BigEndian.Uint16(oidBuf[i*2:]))
oid |= oidPrefix
pos := int64(binary.BigEndian.Uint64(posBuf[i*6:]) & posValidMask)
fsi.Set(oid, pos)
}
}
return fsi, nil
}
// LoadIndexFile loads index from a file @ path.
func LoadIndexFile(path string) (fsi *Index, err error) {
f, err := os.Open(path)
if err != nil {
return nil, &IndexLoadError{path, -1, err}
}
defer func() {
err2 := f.Close()
if err2 != nil && err == nil {
err = &IndexLoadError{path, -1, err}
fsi = nil
}
}()
// NOTE no explicit bufferring needed - ogórek and LoadIndex use bufio.Reader internally
return LoadIndex(f)
}
// ----------------------------------------
// Equal returns whether two indices are the same.
func (a *Index) Equal(b *Index) bool {
if a.TopPos != b.TopPos {
return false
}
return treeEqual(a.Tree, b.Tree)
}
// treeEqual returns whether two fsb.Tree are the same.
func treeEqual(a, b *fsb.Tree) bool {
if a.Len() != b.Len() {
return false
}
ea, _ := a.SeekFirst()
eb, _ := b.SeekFirst()
if ea == nil {
// this means len(a) == 0 -> len(b) == 0 -> eb = nil
return true
}
defer ea.Close()
defer eb.Close()
for {
ka, va, stopa := ea.Next()
kb, vb, stopb := eb.Next()
if stopa != nil || stopb != nil {
if stopa != stopb {
panic("same-length trees iteration did not end at the same time")
}
break
}
if !(ka == kb && va == vb) {
return false
}
}
return true
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package fs1
//go:generate ./py/gen-testdata
import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"sort"
"strings"
"testing"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1/fsb"
)
type indexEntry struct {
oid zodb.Oid
pos int64
}
type byOid []indexEntry
func (p byOid) Len() int { return len(p) }
func (p byOid) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p byOid) Less(i, j int) bool { return p[i].oid < p[j].oid }
var indexTest1 = [...]indexEntry{
{0x0000000000000000, 111},
{0x0000000000000001, 222},
{0x000000000000ffff, 333},
{0x0000000000001234, 444},
{0x0000000000010002, 555},
{0x0000000000010001, 665},
{0xffffffffffffffff, 777},
{0xfffffffffffffff0, 888},
{0x8000000000000000, 999},
{0xa000000000000000, 0x0000ffffffffffff},
}
func setIndex(fsi *Index, kv []indexEntry) {
for _, entry := range kv {
fsi.Set(entry.oid, entry.pos)
}
}
// XXX unneded after Tree.Dump() was made to work ok
func treeString(t *fsb.Tree) string {
entryv := []string{}
e, _ := t.SeekFirst()
if e != nil {
defer e.Close()
for {
k, v, stop := e.Next()
if stop != nil {
break
}
entryv = append(entryv, fmt.Sprintf("%v: %v", k, v))
}
}
return "{" + strings.Join(entryv, ", ") + "}"
}
func TestIndexLookup(t *testing.T) {
// the lookup is tested in cznic.b itself
// here we only lightly exercise it
fsi := IndexNew()
if fsi.Len() != 0 {
t.Errorf("index created non empty")
}
tt := indexTest1
// set
setIndex(fsi, tt[:])
// get
for _, entry := range tt {
pos, ok := fsi.Get(entry.oid)
if !(pos == entry.pos && ok == true) {
t.Errorf("fsi[%x] -> got (%x, %v) ; want (%x, true)", entry.oid, pos, ok, entry.pos)
}
// try non-existing entries too
oid := entry.oid ^ (1 << 32)
pos, ok = fsi.Get(oid)
if !(pos == 0 && ok == false) {
t.Errorf("fsi[%x] -> got (%x, %v) ; want (0, false)", oid, pos, ok)
}
}
// iter
e, err := fsi.SeekFirst()
if err != nil {
t.Fatal(err)
}
sort.Sort(byOid(tt[:]))
i := 0
for ; ; i++ {
oid, pos, errStop := e.Next()
if errStop != nil {
break
}
entry := indexEntry{oid, pos}
entryOk := tt[i]
if entry != entryOk {
t.Errorf("iter step %d: got %v ; want %v", i, entry, entryOk)
}
}
if i != len(tt) {
t.Errorf("iter ended at step %v ; want %v", i, len(tt))
}
}
func checkIndexEqual(t *testing.T, subject string, fsi1, fsi2 *Index) {
if fsi1.Equal(fsi2) {
return
}
if fsi1.TopPos != fsi2.TopPos {
t.Errorf("%s: topPos mismatch: %v ; want %v", subject, fsi1.TopPos, fsi2.TopPos)
}
if !treeEqual(fsi1.Tree, fsi2.Tree) {
t.Errorf("%s: trees mismatch:\nhave: %v\nwant: %v", subject, fsi1.Tree.Dump(), fsi2.Tree.Dump())
//t.Errorf("index load: trees mismatch:\nhave: %v\nwant: %v", treeString(fsi2.Tree), treeString(fsi.Tree))
}
}
func TestIndexSaveLoad(t *testing.T) {
workdir := xworkdir(t)
fsi := IndexNew()
fsi.TopPos = int64(786)
setIndex(fsi, indexTest1[:])
err := fsi.SaveFile(workdir + "/1.fs.index")
if err != nil {
t.Fatal(err)
}
fsi2, err := LoadIndexFile(workdir + "/1.fs.index")
if err != nil {
t.Fatal(err)
}
checkIndexEqual(t, "index load", fsi2, fsi)
// TODO check with
// {0xb000000000000000, 0x7fffffffffffffff}, // will cause 'entry position too large'
}
var _1fs_index = func() *Index {
idx := IndexNew()
idx.TopPos = _1fs_indexTopPos
setIndex(idx, _1fs_indexEntryv[:])
return idx
}()
// test that we can correctly load index data as saved by zodb/py
func TestIndexLoadFromPy(t *testing.T) {
fsiPy, err := LoadIndexFile("testdata/1.fs.index")
if err != nil {
t.Fatal(err)
}
checkIndexEqual(t, "index load", fsiPy, _1fs_index)
}
// test zodb/py can read index data as saved by us
func TestIndexSaveToPy(t *testing.T) {
needZODBPy(t)
workdir := xworkdir(t)
err := _1fs_index.SaveFile(workdir + "/1.fs.index")
if err != nil {
t.Fatal(err)
}
// now ask python part to compare testdata and saved-by-us index
cmd := exec.Command("./py/indexcmp", "testdata/1.fs.index", workdir+"/1.fs.index")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
if err != nil {
t.Fatalf("zodb/py read/compare index: %v", err)
}
}
func BenchmarkIndexLoad(b *testing.B) {
// FIXME small testdata/1.fs is not representative for benchmarks
for i := 0; i < b.N; i++ {
_, err := LoadIndexFile("testdata/1.fs.index")
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkIndexSave(b *testing.B) {
// FIXME small testdata/1.fs is not representative for benchmarks
index, err := LoadIndexFile("testdata/1.fs.index")
if err != nil {
b.Fatal(err)
}
workdir := xworkdir(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = index.SaveFile(workdir + "/1.fs.index")
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkIndexGet(b *testing.B) {
// FIXME small testdata/1.fs is not representative for benchmarks
fsi, err := LoadIndexFile("testdata/1.fs.index")
if err != nil {
b.Fatal(err)
}
oid := zodb.Oid(1)
//oid := zodb.Oid(0x000000000000ea65)
//v, _ := fsi.Get(oid)
//fmt.Println(oid, v)
b.ResetTimer()
for i := 0; i < b.N; i++ {
fsi.Get(oid)
}
}
var haveZODBPy = false
var workRoot string
func TestMain(m *testing.M) {
// check whether we have zodb/py
cmd := exec.Command("python2", "-c", "import ZODB")
err := cmd.Run()
if err == nil {
haveZODBPy = true
}
// setup work root for all tests
workRoot, err = ioutil.TempDir("", "t-index")
if err != nil {
log.Fatal(err)
}
exit := m.Run()
os.RemoveAll(workRoot)
os.Exit(exit)
}
func needZODBPy(t *testing.T) {
if haveZODBPy {
return
}
t.Skipf("skipping: zodb/py is not available")
}
// create temp dir inside workRoot
func xworkdir(t testing.TB) string {
work, err := ioutil.TempDir(workRoot, "")
if err != nil {
t.Fatal(err)
}
return work
}
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""generate reference fs1 database and index for tests"""
from ZODB.FileStorage import FileStorage
from zodbtools.test.gen_testdata import gen_testdb
from zodbtools.util import escapeqq
import struct
# convert numeric oid to/from str
def p64(num):
return struct.pack('>Q', num)
def unpack64(packed):
return struct.unpack('>Q', packed)[0]
def hex64(packed):
return '0x%016x' % unpack64(packed)
def main():
outfs = "testdata/1.fs"
gen_testdb(outfs)
# dump to go what to expect
stor = FileStorage(outfs, read_only=True)
with open("ztestdata_expect_test.go", "w") as f:
def emit(v):
print >>f, v
emit("// Code generated by %s; DO NOT EDIT." % __file__)
emit("package fs1\n")
# index
emit("const _1fs_indexTopPos = %i" % stor._pos)
emit("var _1fs_indexEntryv = [...]indexEntry{")
for k, v in stor._index.iteritems():
emit("\t{%8i, %8i}," % (unpack64(k), v))
emit("}")
if __name__ == '__main__':
main()
#!/usr/bin/env python2
# Copyright (C) 2017 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""compare two ZODB FileStorage v1 index files"""
from ZODB.fsIndex import fsIndex
import sys
def main():
path1, path2 = sys.argv[1:]
d1 = fsIndex.load(path1)
d2 = fsIndex.load(path2)
topPos1, fsi1 = d1["pos"], d1["index"]
topPos2, fsi2 = d2["pos"], d2["index"]
#print topPos1, topPos2
#print fsi1.items()
#print fsi2.items()
equal = (topPos1 == topPos2 and fsi1.items() == fsi2.items())
sys.exit(int(not equal))
if __name__ == '__main__':
main()
// Code generated by ./py/gen-testdata; DO NOT EDIT.
package fs1
const _1fs_indexTopPos = 13703
var _1fs_indexEntryv = [...]indexEntry{
{ 0, 13080},
{ 1, 11320},
{ 2, 10924},
{ 3, 11914},
{ 4, 8174},
{ 5, 12988},
{ 6, 13645},
{ 7, 7633},
{ 8, 10132},
{ 9, 13354},
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment