Commit da01e01e authored by Kirill Smelkov's avatar Kirill Smelkov

X sqlite: Switch to using github.com/gwenn/gosqlite directly

parent c0502d41
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package sqlite
// simple connection pool
import (
"errors"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
sqlite3 "github.com/gwenn/gosqlite"
)
// connPool is a pool of sqlite3.Conn
type connPool struct {
factory func() (*sqlite3.Conn, error) // =nil if pool closed
mu sync.Mutex
connv []*sqlite3.Conn // operated as stack
}
// newConnPool creates new connPool that will be using factory to create new
// connections.
func newConnPool(factory func() (*sqlite3.Conn, error)) *connPool {
return &connPool{factory: factory}
}
// Close closes pool and all connections that were in it.
func (p *connPool) Close() error {
p.mu.Lock()
connv := p.connv
p.connv = nil
p.factory = nil
p.mu.Unlock()
var errv xerr.Errorv
for _, conn := range connv {
err := conn.Close()
errv.Appendif(err)
}
return errv.Err()
}
var errClosedPool = errors.New("sqlite: pool: getConn on closed pool")
// getConn returns a connection - either from pool, or newly created if the
// pool was empty.
func (p *connPool) getConn() (conn *sqlite3.Conn, _ error) {
p.mu.Lock()
factory := p.factory
if factory == nil {
p.mu.Unlock()
return nil, errClosedPool
}
if l := len(p.connv); l > 0 {
l--
conn = p.connv[l]
p.connv[l] = nil // just in case
p.connv = p.connv[:l]
}
p.mu.Unlock()
if conn != nil {
return conn, nil
}
// pool was empty - we need to create new connection
return factory()
}
// putConn puts a connection to pool.
//
// Caller must not directly use conn after call to putConn anymore.
func (p *connPool) putConn(conn *sqlite3.Conn) {
p.mu.Lock()
if p.factory != nil { // forgiving putConn after close
p.connv = append(p.connv, conn)
}
p.mu.Unlock()
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
// schema & queries are based on neo/storage/database/sqlite.py
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
......@@ -21,9 +23,11 @@ package sqlite
import (
"context"
"errors"
"fmt"
"net/url"
"reflect"
"strings"
//"reflect"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -32,8 +36,34 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
"database/sql"
_ "github.com/mattn/go-sqlite3"
// NOTE github.com/gwenn/gosqlite is used for the following reasons:
//
// - it is used directly instead of using it via "database/sql" because for a
// typical 5µs query quering through "database/sql", even in the most
// careful, hacky and unsafe way, adds at least 3µs and more.
// see also: https://github.com/golang/go/issues/23879
//
// - "github.com/mattn/go-sqlite3" does not provide a good way to Scan
// queries made directly.
//
// we need to do only simple queries and thus do not use any Cgo->Go
// callback-related functionality from github.com/gwenn/gosqlite. This
// way it should be safe for us to use it even without GODEBUG=cgocheck=0.
//
// --------
//
// NOTE 2: we do not interrupt requests on context cancelation:
//
// - it is relatively expensive to support when using a CGo library - see e.g.
// https://github.com/mattn/go-sqlite3/pull/530
// https://github.com/golang/go/issues/19574#issuecomment-366513872
//
// - on Linux disk file IO, in contrast to e.g. network and pipes,
// cannot be really interrupted.
//
// so we are ok for the cancel to be working on the granualarity of
// whole query.
sqlite3 "github.com/gwenn/gosqlite"
)
// ---- schema ----
......@@ -132,30 +162,115 @@ const tobj = `
type Backend struct {
db *sql.DB
url string
pool *connPool
url string
}
var _ storage.Backend = (*Backend)(nil)
func (b *Backend) query1(ctx context.Context, query string, argv ...interface{}) *sql.Row {
return b.db.QueryRowContext(ctx, query, argv...)
// row1 is like sql.Row to Scan values once and then put stmt and conn back to their pools.
type row1 struct {
pool *connPool
conn *sqlite3.Conn
stmt *sqlite3.Stmt
err error // != nil on an error obtaining the row
}
var errNoRows = errors.New("sqlite: no rows in result set")
func (r *row1) Scan(argv ...interface{}) error {
if r.pool == nil {
panic("sqlite: row1: .Scan called second time")
}
err := r.err
if err == nil {
err = r.stmt.Scan(argv...)
}
if r.stmt != nil {
err2 := r.stmt.Reset() // else it won't be put back to cache
if err == nil {
err = err2
}
err2 = r.stmt.Finalize() // -> conn's stmt cache
if err == nil {
err = err2
}
r.stmt = nil // just in case
}
if r.conn != nil {
r.pool.putConn(r.conn)
r.conn = nil
}
// to catch double .Scan
r.pool = nil
return err
}
// query1 performs 1 select-like query.
//
// the result needs to be .Scan'ned once similarly to how it is done in database/sql.
func (b *Backend) query1(query string, argv ...interface{}) *row1 {
row := &row1{pool: b.pool}
// pool -> conn
conn, err := b.pool.getConn()
if err != nil {
row.err = err
return row
}
row.conn = conn
// conn -> stmt
stmt, err := conn.Prepare(query) // uses conn's stmt cache
if err != nil {
row.err = err
return row
}
row.stmt = stmt
// stmt += argv
err = stmt.Bind(argv...)
if err != nil {
row.err = err
return row
}
// everything prepared - run the query
ok, err := stmt.Next()
if err != nil {
row.err = err
return row
}
if !ok {
row.err = errNoRows
}
return row
}
func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
var lastTid zodb.Tid
// FIXME nodeID <- my node UUID
myID := proto.UUID(proto.STORAGE, 1)
err := b.query1(ctx,
"SELECT MAX(tid) FROM pt, trans" +
" WHERE nid=? AND rid=partition" /* XXX AND tid<=? (max_tid) */,
myID).Scan(&lastTid)
err := b.query1("SELECT MAX(tid) FROM pt, trans" +
" WHERE nid=? AND rid=partition" /* XXX AND tid<=? (max_tid) */,
myID).Scan(&lastTid)
if err != nil {
// no transaction have been committed
if err == sql.ErrNoRows {
if err == errNoRows {
return 0, nil
}
......@@ -171,13 +286,12 @@ func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
// FIXME nodeID <- my node UUID
myID := proto.UUID(proto.STORAGE, 1)
err := b.query1(ctx,
"SELECT MAX(oid) FROM pt, obj WHERE nid=? AND rid=partition",
myID).Scan(&lastOid)
err := b.query1("SELECT MAX(oid) FROM pt, obj WHERE nid=? AND rid=partition",
myID).Scan(&lastOid)
if err != nil {
// no objects
if err == sql.ErrNoRows {
if err == errNoRows {
return proto.INVALID_OID, nil
}
......@@ -199,12 +313,14 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
//var data sql.RawBytes
var data []byte
// XXX recheck vvv with sqlite3 direct
// hash is variable-length BLOB - Scan refuses to put it into [20]byte
//var hash sql.RawBytes
var hash []byte
// obj.value_tid can be null
var valueTid sql.NullInt64 // XXX ok not to uint64 - max tid is max signed int64
//var valueTid sql.NullInt64 // XXX ok not to uint64 - max tid is max signed int64
var valueTid int64 // XXX ok not to uint64 - max tid is max signed int64
// FIXME pid = getReadablePartition (= oid % Np; error if pid not readable)
pid := 0
......@@ -212,7 +328,9 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
// XXX somehow detect errors in sql misuse and log them as 500 without reporting to client?
// XXX such errors start with "unsupported Scan, "
err = b.query1(ctx,
// XXX use conn for severl query1 (see below) without intermediate returns to pool?
err = b.query1(
"SELECT tid, compression, data.hash, value, value_tid" +
" FROM obj LEFT JOIN data ON obj.data_id = data.id" +
" WHERE partition=? AND oid=? AND tid<=?" +
......@@ -221,10 +339,10 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
Scan(&obj.Serial, &obj.Compression, &hash, &data, &valueTid)
if err != nil {
if err == sql.ErrNoRows {
if err == errNoRows {
// nothing found - check whether object exists at all
var __ zodb.Oid
err = b.query1(ctx,
err = b.query1(
"SELECT oid FROM obj WHERE partition=? AND oid=? LIMIT 1",
pid, xid.Oid) .Scan(&__)
......@@ -235,7 +353,7 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
DeletedAt: 0, // XXX hardcoded
}
case err == sql.ErrNoRows:
case err == errNoRows:
err = &zodb.NoObjectError{Oid: xid.Oid}
}
......@@ -251,8 +369,8 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
copy(obj.Checksum[:], hash)
// valueTid -> obj.DataSerial
if valueTid.Valid {
obj.DataSerial = zodb.Tid(valueTid.Int64)
if valueTid != 0 {
obj.DataSerial = zodb.Tid(valueTid)
}
......@@ -262,7 +380,7 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
// find out nextSerial
// XXX kill nextSerial support after neo/py cache does not need it
err = b.query1(ctx,
err = b.query1(
"SELECT tid from obj" +
" WHERE partition=? AND oid=? AND tid>?" +
" ORDER BY tid LIMIT 1",
......@@ -270,7 +388,7 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
Scan(&obj.NextSerial)
if err != nil {
if err == sql.ErrNoRows {
if err == errNoRows {
obj.NextSerial = proto.INVALID_TID
} else {
return nil, err
......@@ -281,51 +399,59 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
}
func (b *Backend) config(ctx context.Context, key string, pvalue interface{}) error {
return b.query1(ctx, "SELECT value FROM config WHERE name=?", key).Scan(pvalue)
func (b *Backend) config(key string, pvalue *string) error {
return b.query1("SELECT value FROM config WHERE name=?", key).Scan(pvalue)
}
func (b *Backend) Close() error {
err := b.pool.Close()
return err // XXX err ctx
}
// ---- open by URL ----
func openURL(ctx context.Context, u *url.URL) (_ storage.Backend, err error) {
// TODO handle query
// XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, err
url := u.String()
dburl := strings.TrimPrefix(url, u.Scheme+"://") // url with stripped sqlite://
connFactory := func() (*sqlite3.Conn, error) {
return sqlite3.Open(dburl)
}
b := &Backend{db: db, url: u.String()}
b := &Backend{pool: newConnPool(connFactory), url: url}
defer func() {
if err != nil {
db.Close()
b.Close()
}
}()
// check we can actually access db
err = db.PingContext(ctx)
conn, err := b.pool.getConn()
if err == nil {
err = conn.Close()
}
if err != nil {
return nil, err // XXX err ctx
return nil, err
}
// check schema and that our limited version can work with the db
// (by making some queries in open we also check whether we can access db at all)
errv := xerr.Errorv{}
checkConfig := func(name string, expect interface{}) {
pvalue := reflect.New(reflect.TypeOf(expect)).Interface()
err := b.config(ctx, name, pvalue)
//pvalue := reflect.New(reflect.TypeOf(expect)).Interface()
value := ""
err := b.config(name, &value)
// XXX prefix "b.path: config: %s:"
switch err {
case sql.ErrNoRows:
case errNoRows:
err = fmt.Errorf("not found")
case nil:
value := reflect.ValueOf(pvalue).Elem().Interface()
if value != expect {
err = fmt.Errorf("got %v; want %v", value, expect)
//value := reflect.ValueOf(pvalue).Elem().Interface()
sexpect := fmt.Sprintf("%v", expect)
if value != sexpect {
err = fmt.Errorf("got %v; want %v", value, sexpect)
}
}
......@@ -335,7 +461,7 @@ func openURL(ctx context.Context, u *url.URL) (_ storage.Backend, err error) {
}
checkConfig("version", schemaVersion)
checkConfig("nid", proto.UUID(proto.STORAGE, 1))
checkConfig("nid", int(proto.UUID(proto.STORAGE, 1)))
checkConfig("partitions", 1)
checkConfig("replicas", 1)
......@@ -358,8 +484,8 @@ func openURL(ctx context.Context, u *url.URL) (_ storage.Backend, err error) {
// not-yet-moved to trans/tobj transactions.
nttrans, ntobj := 0, 0
errv = xerr.Errorv{}
errv.Appendif( b.query1(ctx, "SELECT COUNT(*) FROM ttrans") .Scan(&nttrans) )
errv.Appendif( b.query1(ctx, "SELECT COUNT(*) FROM tobj") .Scan(&ntobj) )
errv.Appendif( b.query1("SELECT COUNT(*) FROM ttrans") .Scan(&nttrans) )
errv.Appendif( b.query1("SELECT COUNT(*) FROM tobj") .Scan(&ntobj) )
err = errv.Err()
if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment