Commit aa6b3e1c authored by Kirill Smelkov's avatar Kirill Smelkov

X Goodbye xio.Name()

It was not a good idea that objects should always have name - some don't
do e.g. bytes.Reader (analog of StringIO.StringIO in python).

Don't try to automatically unwrap IO wrappers - just check in fs1
directly - if object has Name - it will be included in decode/check
errors as prefix; if not - not included.

LimitedWriter was not used anywhere so also killed.
parent 0dce582b
...@@ -107,7 +107,7 @@ type Msg interface { ...@@ -107,7 +107,7 @@ type Msg interface {
} }
// ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hits buffer overflow // ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hits buffer overflow
var ErrDecodeOverflow = errors.New("decode: bufer overflow") var ErrDecodeOverflow = errors.New("decode: buffer overflow")
// ---- messages ---- // ---- messages ----
......
...@@ -61,11 +61,6 @@ func NewSeqReaderAtSize(r io.ReaderAt, size int) *SeqReaderAt { ...@@ -61,11 +61,6 @@ func NewSeqReaderAtSize(r io.ReaderAt, size int) *SeqReaderAt {
return sb return sb
} }
// XXX not yet sure it is good idea
func (sb *SeqReaderAt) IOUnwrap() interface{} {
return sb.r
}
// // XXX temp // // XXX temp
// func init() { // func init() {
// log.SetFlags(0) // log.SetFlags(0)
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcommon is a staging place for common utilities not yet sorted out // Package xcommon is a staging place for common utilities not yet sorted out
// to their proper homes // to their proper homes.
package xcommon package xcommon
...@@ -5,10 +5,7 @@ package xio ...@@ -5,10 +5,7 @@ package xio
import ( import (
"context" "context"
"fmt"
"io" "io"
"net"
"os"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
...@@ -32,61 +29,6 @@ func (r *CountReader) Read(p []byte) (int, error) { ...@@ -32,61 +29,6 @@ func (r *CountReader) Read(p []byte) (int, error) {
// TODO func to get position (requiring io.Seeker to just Seek(0, SeekCurrent) is too much) // TODO func to get position (requiring io.Seeker to just Seek(0, SeekCurrent) is too much)
// XXX previously used InputOffset(), but generally e.g. for io.Writer "input" is not appropriate // XXX previously used InputOffset(), but generally e.g. for io.Writer "input" is not appropriate
// LimitedWriter is like io.LimitedReader but for writes
type LimitedWriter struct {
io.Writer
N int64
}
func (l *LimitedWriter) Write(p []byte) (n int, err error) {
if l.N <= 0 {
return 0, io.EOF
}
if int64(len(p)) > l.N {
p = p[0:l.N]
}
n, err = l.Writer.Write(p)
l.N -= int64(n)
return
}
func LimitWriter(w io.Writer, n int64) io.Writer { return &LimitedWriter{w, n} }
// XXX not sure it is a good ide
// XXX another option is IOName() - but that requires leaf packages to import xio
type Wrapper interface {
IOUnwrap() interface{}
}
// Name returns a "filename" associated with io.Reader, io.Writer, net.Conn, ...
func Name(f interface {}) string {
switch f := f.(type) {
case *os.File:
// XXX better via interface { Name() string } ?
// but Name() is too broad compared to FileName()
return f.Name()
case net.Conn:
// XXX not including LocalAddr is ok?
return f.RemoteAddr().String()
case *CountReader: return Name(f.Reader)
case *io.LimitedReader: return Name(f.R)
case *LimitedWriter: return Name(f.Writer)
case *io.PipeReader: return "pipe"
case *io.PipeWriter: return "pipe"
// XXX SectionReader MultiReader TeeReader
// XXX bufio.Reader bufio.Writer bufio.Scanner
case Wrapper: return Name(f.IOUnwrap())
// if name cannot be determined - let's provide full info
default:
return fmt.Sprintf("%#v", f)
}
}
// CloseWhenDone arranges for c to be closed either when ctx is cancelled or // CloseWhenDone arranges for c to be closed either when ctx is cancelled or
// surrounding function returns. // surrounding function returns.
......
...@@ -72,7 +72,6 @@ import ( ...@@ -72,7 +72,6 @@ import (
"os" "os"
"sync" "sync"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
...@@ -387,7 +386,7 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er ...@@ -387,7 +386,7 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
// Iterate creates zodb-level iterator for tidMin..tidMax range // Iterate creates zodb-level iterator for tidMin..tidMax range
func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator { func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := seqReadAt(fs.file)
ziter := &zIter{iter: Iter{R: fsSeq}} ziter := &zIter{iter: Iter{R: fsSeq}}
iter := &ziter.iter iter := &ziter.iter
...@@ -533,8 +532,7 @@ func (fs *FileStorage) Close() error { ...@@ -533,8 +532,7 @@ func (fs *FileStorage) Close() error {
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
// XXX lock? // XXX lock?
fsSeq := xbufio.NewSeqReaderAt(fs.file) return BuildIndex(ctx, seqReadAt(fs.file), nil/*no progress; XXX somehow log it? */)
return BuildIndex(ctx, fsSeq, nil/*no progress; XXX somehow log it? */)
} }
// loadIndex loads on-disk index to RAM and verifies it against data lightly // loadIndex loads on-disk index to RAM and verifies it against data lightly
...@@ -553,8 +551,7 @@ func (fs *FileStorage) loadIndex(ctx context.Context) (err error) { ...@@ -553,8 +551,7 @@ func (fs *FileStorage) loadIndex(ctx context.Context) (err error) {
} }
// quickly verify index sanity for last 100 transactions // quickly verify index sanity for last 100 transactions
fsSeq := xbufio.NewSeqReaderAt(fs.file) _, err = index.Verify(ctx, seqReadAt(fs.file), 100, nil/*no progress*/)
_, err = index.Verify(ctx, fsSeq, 100, nil/*no progress*/)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -26,8 +26,6 @@ import ( ...@@ -26,8 +26,6 @@ import (
"io" "io"
"os" "os"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
...@@ -91,7 +89,7 @@ const ( ...@@ -91,7 +89,7 @@ const (
// RecordError represents error associated with operation on a record in // RecordError represents error associated with operation on a record in
// FileStorage data file. // FileStorage data file.
type RecordError struct { type RecordError struct {
Path string // path of the data file Path string // path of the data file; present if used IO object was with .Name()
Record string // record kind - "file header", "transaction record", "data record", ... Record string // record kind - "file header", "transaction record", "data record", ...
Pos int64 // position of record Pos int64 // position of record
Subj string // subject context for the error - e.g. "read" or "check" Subj string // subject context for the error - e.g. "read" or "check"
...@@ -104,40 +102,45 @@ func (e *RecordError) Cause() error { ...@@ -104,40 +102,45 @@ func (e *RecordError) Cause() error {
func (e *RecordError) Error() string { func (e *RecordError) Error() string {
// XXX omit path: when .Err already contains it (e.g. when it is os.PathError)? // XXX omit path: when .Err already contains it (e.g. when it is os.PathError)?
return fmt.Sprintf("%s: %s @%d: %s: %s", e.Path, e.Record, e.Pos, e.Subj, e.Err) prefix := e.Path
if prefix != "" {
prefix += ": "
}
return fmt.Sprintf("%s%s @%d: %s: %s", prefix, e.Record, e.Pos, e.Subj, e.Err)
} }
// err creates RecordError for transaction located at txnh.Pos
func (txnh *TxnHeader) err(r io.ReaderAt, subj string, err error) error { // err creates RecordError for transaction located at txnh.Pos in f
return &RecordError{xio.Name(r), "transaction record", txnh.Pos, subj, err} func (txnh *TxnHeader) err(f interface{}, subj string, err error) error {
return &RecordError{ioname(f), "transaction record", txnh.Pos, subj, err}
} }
// err creates RecordError for data record located at dh.Pos // err creates RecordError for data record located at dh.Pos in f
func (dh *DataHeader) err(r io.ReaderAt, subj string, err error) error { func (dh *DataHeader) err(f interface{}, subj string, err error) error {
return &RecordError{xio.Name(r), "data record", dh.Pos, subj, err} return &RecordError{ioname(f), "data record", dh.Pos, subj, err}
} }
// ierr is an interface for something which can create errors. // ierr is an interface for something which can create errors.
// it is used by TxnHeader and DataHeader to create appropriate errors with their context. // it is used by TxnHeader and DataHeader to create appropriate errors with their context.
type ierr interface { type ierr interface {
err(r io.ReaderAt, subj string, err error) error err(f interface{}, subj string, err error) error
} }
// errf is syntactic shortcut for err and fmt.Errorf // errf is syntactic shortcut for err and fmt.Errorf
func errf(r io.ReaderAt, e ierr, subj, format string, a ...interface{}) error { func errf(f interface{}, e ierr, subj, format string, a ...interface{}) error {
return e.err(r, subj, fmt.Errorf(format, a...)) return e.err(f, subj, fmt.Errorf(format, a...))
} }
// checkErr is syntactic shortcut for errf("check", ...) // checkErr is syntactic shortcut for errf("check", ...)
func checkErr(r io.ReaderAt, e ierr, format string, a ...interface{}) error { func checkErr(f interface{}, e ierr, format string, a ...interface{}) error {
return errf(r, e, "check", format, a...) return errf(f, e, "check", format, a...)
} }
// bug panics with errf("bug", ...) // bug panics with errf("bug", ...)
func bug(r io.ReaderAt, e ierr, format string, a ...interface{}) { func bug(f interface{}, e ierr, format string, a ...interface{}) {
panic(errf(r, e, "bug", format, a...)) panic(errf(f, e, "bug", format, a...))
} }
...@@ -151,7 +154,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error { ...@@ -151,7 +154,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error {
return err return err
} }
if string(fh.Magic[:]) != Magic { if string(fh.Magic[:]) != Magic {
return fmt.Errorf("%s: invalid fs1 magic %q", xio.Name(r), fh.Magic) return fmt.Errorf("%sinvalid fs1 magic %q", ioprefix(r), fh.Magic)
} }
return nil return nil
...@@ -527,7 +530,7 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error { ...@@ -527,7 +530,7 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
if err != nil { if err != nil {
// data record @...: -> (prev rev): data record @...: ... // data record @...: -> (prev rev): data record @...: ...
// XXX dup wrt DataHeader.err // XXX dup wrt DataHeader.err
err = &RecordError{xio.Name(r), "data record", posCur, "-> (prev rev)", err} err = &RecordError{ioname(r), "data record", posCur, "-> (prev rev)", err}
} }
return err return err
} }
...@@ -598,7 +601,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error { ...@@ -598,7 +601,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
if err != nil { if err != nil {
// data record @...: -> (prev rev): data record @...: ... // data record @...: -> (prev rev): data record @...: ...
// XXX dup wrt DataHeader.err // XXX dup wrt DataHeader.err
err = &RecordError{xio.Name(r), "data record", posCur, "-> (back)", err} err = &RecordError{ioname(r), "data record", posCur, "-> (back)", err}
} }
return err return err
...@@ -630,7 +633,7 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { ...@@ -630,7 +633,7 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
if nextPos + DataHeaderSize > txnTailPos { if nextPos + DataHeaderSize > txnTailPos {
// XXX dup wrt DataHeader.err // XXX dup wrt DataHeader.err
return &RecordError{xio.Name(r), "data record", nextPos, "check", return &RecordError{ioname(r), "data record", nextPos, "check",
fmt.Errorf("data record header [..., %d] overlaps txn boundary [..., %d)", fmt.Errorf("data record header [..., %d] overlaps txn boundary [..., %d)",
nextPos + DataHeaderSize, txnTailPos)} nextPos + DataHeaderSize, txnTailPos)}
} }
...@@ -764,7 +767,7 @@ func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error ...@@ -764,7 +767,7 @@ func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error
}() }()
// use IO optimized for sequential access when iterating // use IO optimized for sequential access when iterating
fSeq := xbufio.NewSeqReaderAt(f) fSeq := seqReadAt(f)
switch dir { switch dir {
case IterForward: case IterForward:
......
...@@ -29,7 +29,6 @@ import ( ...@@ -29,7 +29,6 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/go123/prog" "lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xbytes" "lab.nexedi.com/kirr/go123/xbytes"
...@@ -361,8 +360,12 @@ func (d *DumperFsTail) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error { ...@@ -361,8 +360,12 @@ func (d *DumperFsTail) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error {
if err == io.EOF { if err == io.EOF {
err = io.ErrUnexpectedEOF err = io.ErrUnexpectedEOF
} }
// XXX dup wrt fs1.TxnHeader.err // XXX dup wrt fs1.TxnHeader.err && ioname()
return &fs1.RecordError{xio.Name(it.R), "transaction record", txnh.Pos, "read data payload", err} ioname := ""
if rn, ok := it.R.(interface{ Name() string }); ok {
ioname = rn.Name()
}
return &fs1.RecordError{ioname, "transaction record", txnh.Pos, "read data payload", err}
} }
// print information about read txn record // print information about read txn record
......
...@@ -42,7 +42,6 @@ import ( ...@@ -42,7 +42,6 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio" "lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
) )
// Index is in-RAM Oid -> Data record position mapping used to associate Oid // Index is in-RAM Oid -> Data record position mapping used to associate Oid
...@@ -205,7 +204,7 @@ func (fsi *Index) SaveFile(path string) error { ...@@ -205,7 +204,7 @@ func (fsi *Index) SaveFile(path string) error {
// IndexLoadError is the error type returned by index load routines // IndexLoadError is the error type returned by index load routines
type IndexLoadError struct { type IndexLoadError struct {
Filename string Filename string // present if used IO object was with .Name()
Pos int64 Pos int64
Err error Err error
} }
...@@ -241,7 +240,7 @@ func LoadIndex(r io.Reader) (fsi *Index, err error) { ...@@ -241,7 +240,7 @@ func LoadIndex(r io.Reader) (fsi *Index, err error) {
var picklePos int64 var picklePos int64
defer func() { defer func() {
if err != nil { if err != nil {
err = &IndexLoadError{xio.Name(r), picklePos, err} err = &IndexLoadError{ioname(r), picklePos, err}
} }
}() }()
...@@ -429,7 +428,7 @@ type IndexUpdateProgress struct { ...@@ -429,7 +428,7 @@ type IndexUpdateProgress struct {
// - topPos (if it is != -1), or // - topPos (if it is != -1), or
// - r's position at which read got EOF (if topPos=-1). // - r's position at which read got EOF (if topPos=-1).
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func(*IndexUpdateProgress)) (err error) { func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func(*IndexUpdateProgress)) (err error) {
defer xerr.Contextf(&err, "%s: reindex %v..%v", xio.Name(r), index.TopPos, topPos) defer xerr.Contextf(&err, "%sreindex %v..%v", ioprefix(r), index.TopPos, topPos)
if topPos >= 0 && index.TopPos > topPos { if topPos >= 0 && index.TopPos > topPos {
return fmt.Errorf("backward update requested") return fmt.Errorf("backward update requested")
...@@ -543,7 +542,7 @@ func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpd ...@@ -543,7 +542,7 @@ func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpd
}() }()
// use IO optimized for sequential access when building index // use IO optimized for sequential access when building index
fSeq := xbufio.NewSeqReaderAt(f) fSeq := seqReadAt(f)
return BuildIndex(ctx, fSeq, progress) return BuildIndex(ctx, fSeq, progress)
} }
...@@ -553,16 +552,20 @@ func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpd ...@@ -553,16 +552,20 @@ func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpd
// IndexCorruptError is the error type returned by index verification routines // IndexCorruptError is the error type returned by index verification routines
// when index was found to not match original FileStorage data. // when index was found to not match original FileStorage data.
type IndexCorruptError struct { type IndexCorruptError struct {
DataFileName string DataFileName string // present if data IO object was with .Name()
Detail string Detail string
} }
func (e *IndexCorruptError) Error() string { func (e *IndexCorruptError) Error() string {
return fmt.Sprintf("%s: verify index: %s", e.DataFileName, e.Detail) prefix := e.DataFileName
if prefix != "" {
prefix += ": "
}
return fmt.Sprintf("%sverify index: %s", prefix, e.Detail)
} }
func indexCorrupt(r io.ReaderAt, format string, argv ...interface{}) *IndexCorruptError { func indexCorrupt(f interface{}, format string, argv ...interface{}) *IndexCorruptError {
return &IndexCorruptError{DataFileName: xio.Name(r), Detail: fmt.Sprintf(format, argv...)} return &IndexCorruptError{DataFileName: ioname(f), Detail: fmt.Sprintf(format, argv...)}
} }
// IndexVerifyProgress is data sent by Index.Verify to notify about progress // IndexVerifyProgress is data sent by Index.Verify to notify about progress
...@@ -595,7 +598,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres ...@@ -595,7 +598,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres
return // leave it as is return // leave it as is
} }
xerr.Contextf(&err, "%s: verify index @%v~{%v}", xio.Name(r), index.TopPos, ntxn) xerr.Contextf(&err, "%sverify index @%v~{%v}", ioprefix(r), index.TopPos, ntxn)
}() }()
oidChecked = map[zodb.Oid]struct{}{} // Set<zodb.Oid> oidChecked = map[zodb.Oid]struct{}{} // Set<zodb.Oid>
...@@ -709,7 +712,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr ...@@ -709,7 +712,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr
} }
// use IO optimized for sequential access when verifying index // use IO optimized for sequential access when verifying index
fSeq := xbufio.NewSeqReaderAt(f) fSeq := seqReadAt(f)
return index.Verify(ctx, fSeq, ntxn, progress) return index.Verify(ctx, fSeq, ntxn, progress)
} }
...@@ -21,6 +21,9 @@ package fs1 ...@@ -21,6 +21,9 @@ package fs1
import ( import (
"io" "io"
"os"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
) )
// noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF // noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF
...@@ -38,3 +41,52 @@ func okEOF(err error) error { ...@@ -38,3 +41,52 @@ func okEOF(err error) error {
} }
return err return err
} }
// record reading routines work on abstract file-like interfaces.
// when they report e.g. decoding error, if reader has name, e.g. as os.File
// does, this name is included into error as prefix.
// named represents something having a name - ex. os.File
type named interface {
Name() string
}
// ioname returns f's name if it has one.
// if not - "" is returned.
func ioname(f interface{}) string {
if fn, ok := f.(named); ok {
return fn.Name()
}
return ""
}
// ioprefix returns "<name>: " for f if it has name.
// if not - "" is returned.
func ioprefix(f interface{}) string {
n := ioname(f)
if n != "" {
return n + ": "
}
return ""
}
// seqReadAt wraps os.File with xbufio.SeqReaderAt for its .ReadAt method.
// in particular f's name is preserved.
func seqReadAt(f *os.File) *seqFileReaderAt {
fseq := xbufio.NewSeqReaderAt(f)
return &seqFileReaderAt{fseq, f}
}
type seqFileReaderAt struct {
*xbufio.SeqReaderAt
// keep os.File still exposed for .Name()
// XXX better xbufio.SeqReaderAt expose underlying reader?
*os.File
}
func (f *seqFileReaderAt) ReadAt(p []byte, off int64) (int, error) {
// route ReadAt to wrapper
return f.SeqReaderAt.ReadAt(p, off)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment