Commit 75afd6a1 authored by Kirill Smelkov's avatar Kirill Smelkov

X xbufio.SeqReaderAt moved -> go123

parent 4e15d38b
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xbufio
// buffering for io.ReaderAt optimized for sequential access
import (
"io"
//"log"
)
// SeqReaderAt implements buffering for a io.ReaderAt optimized for sequential access.
//
// Both forward, backward and interleaved forward/backward access patterns are supported
//
// NOTE SeqReaderAt is not safe to use from multiple goroutines concurrently.
// Strictly speaking this goes against io.ReaderAt interface but sequential
// workloads usually mean sequential processing. It would be a pity to
// add mutex for nothing.
type SeqReaderAt struct {
// buffer for data at pos. cap(buf) - whole buffer capacity
buf []byte
pos int64
posLastAccess int64 // position of last access request
posLastFwdAfter int64 // position of last forward access request
posLastBackward int64 // position of last backward access request
r io.ReaderAt
// debug: for ioReadAt tracing
//posLastIO int64
}
const defaultSeqBufSize = 8192
// NewSeqReaderAt wraps r with SeqReaderAt with buffer of default size.
func NewSeqReaderAt(r io.ReaderAt) *SeqReaderAt {
return NewSeqReaderAtSize(r, defaultSeqBufSize)
}
// NewSeqReaderAtSize wraps r with SeqReaderAt with buffer of specified size.
func NewSeqReaderAtSize(r io.ReaderAt, size int) *SeqReaderAt {
sb := &SeqReaderAt{r: r, buf: make([]byte, 0, size)} // all positions are zero initially
return sb
}
// // XXX temp
// func init() {
// log.SetFlags(0)
// }
// debug helper for sb.r.ReadAt
func (sb *SeqReaderAt) ioReadAt(p []byte, pos int64) (int, error) {
/*
verb := "read"
if len(p) > cap(sb.buf) {
verb = "READ"
}
log.Printf("%s\t[%v, %v)\t#%v\tIO%+d", verb, pos, pos + len64(p), len(p), pos - sb.posLastIO)
sb.posLastIO = pos
*/
return sb.r.ReadAt(p, pos)
}
func (sb *SeqReaderAt) ReadAt(p []byte, pos int64) (int, error) {
//log.Printf("access\t[%v, %v)\t#%v\t@%+d", pos, pos + len64(p), len(p), pos - sb.posLastAccess)
// read-in last access positions and update them in *sb with current ones for next read
posLastAccess := sb.posLastAccess
posLastFwdAfter := sb.posLastFwdAfter
posLastBackward := sb.posLastBackward
sb.posLastAccess = pos
if pos >= posLastAccess {
sb.posLastFwdAfter = pos + len64(p)
} else {
sb.posLastBackward = pos
}
// if request size > buffer - read data directly
if len(p) > cap(sb.buf) {
// no copying from sb.buf here at all as if e.g. we could copy from sb.buf, the
// kernel can copy the same data from pagecache as well, and it will take the same time
// because for data in sb.buf corresponding page in pagecache has high p. to be hot.
return sb.ioReadAt(p, pos)
}
var nhead int // #data read from buffer for p head
var ntail int // #data read from buffer for p tail
// try to satisfy read request via (partly) reading from buffer
// use buffered data: head(p)
if sb.pos <= pos && pos < sb.pos + len64(sb.buf) {
nhead = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
// if all was read from buffer - we are done
if nhead == len(p) {
return nhead, nil
}
p = p[nhead:]
pos += int64(nhead)
// empty request (possibly not hitting buffer - do not let it go to real IO path)
// `len(p) != 0` is also needed for backward reading from buffer, so this condition goes before
} else if len(p) == 0 {
return 0, nil
// use buffered data: tail(p)
} else if posAfter := pos + len64(p);
sb.pos < posAfter && posAfter <= sb.pos + len64(sb.buf) {
// here we know pos < sb.pos
//
// proof: consider if pos >= sb.pos.
// Then from `pos <= sb.pos + len(sb.buf) - len(p)` above it follow that:
// `pos < sb.pos + len(sb.buf)` (NOTE strictly < because len(p) > 0)
// and we come to condition which is used in `start + forward` if
ntail = copy(p[sb.pos - pos:], sb.buf) // NOTE ntail == len(p[sb.pos - pos:])
// NOTE no return here: full p read is impossible for backward
// p filling: it would mean `pos = sb.pos` which in turn means
// the condition for forward buf reading must have been triggered.
p = p[:sb.pos - pos]
// pos stays the same
}
// here we need to refill the buffer. determine range to read by current IO direction.
// NOTE len(p) <= cap(sb.buf)
var xpos int64 // position for new IO request
if pos >= posLastAccess {
// forward
xpos = pos
// if forward trend continues and buffering can be made adjacent to
// previous forward access - shift reading down right to after it.
xLastFwdAfter := posLastFwdAfter + int64(nhead) // adjusted for already read from buffer
if xLastFwdAfter <= xpos && xpos + len64(p) <= xLastFwdAfter + cap64(sb.buf) {
xpos = xLastFwdAfter
}
// NOTE no symmetry handling for "alternatively" in backward case
// because symmetry would be:
//
// ← → ← →
// 3 4 2 1
//
// but buffer for 4 already does not overlap 3 as for
// non-trendy forward reads buffer always grows forward.
} else {
// backward
xpos = pos
// if backward trend continues and buffering would overlap with
// previous backward access - shift reading up right to it.
xLastBackward := posLastBackward - int64(ntail) // adjusted for already read from buffer
if xpos < xLastBackward && xLastBackward < xpos + cap64(sb.buf) {
xpos = max64(xLastBackward, xpos + len64(p)) - cap64(sb.buf)
// alternatively even if backward trend does not continue anymore
// but if this will overlap with last access range, probably
// it is better (we are optimizing for sequential access) to
// shift loading region down not to overlap. example:
//
// ← → ← →
// 2 1 4 3
//
// here we do not want 4'th buffer to overlap with 3
} else if xpos + cap64(sb.buf) > posLastAccess {
xpos = max64(posLastAccess, xpos + len64(p)) - cap64(sb.buf)
}
// don't let reading go beyond start of the file
xpos = max64(xpos, 0)
}
nn, err := sb.ioReadAt(sb.buf[:cap(sb.buf)], xpos)
// even if there was an error, or data partly read, we cannot retain
// the old buf content as io.ReaderAt can use whole buf as scratch space
sb.pos = xpos
sb.buf = sb.buf[:nn]
// here we know:
// - some data was read
// - in case of successful read pos/p lies completely inside sb.pos/sb.buf
// copy loaded data from buffer to p
pBufOffset := pos - xpos // offset corresponding to p in sb.buf
if pBufOffset >= len64(sb.buf) {
// this can be only due to some IO error
// if original request was narrower than buffer try to satisfy
// it once again directly
if pos != xpos {
nn, err = sb.ioReadAt(p, pos)
if nn < len(p) {
return nhead + nn, err
}
return nhead + nn + ntail, nil // request fully satisfied - we can ignore error
}
// Just return the error
return nhead, err
}
nn = copy(p, sb.buf[pBufOffset:])
if nn < len(p) {
// some error - do not account tail - we did not get to it
return nhead + nn, err
}
// all ok
// NOTE if there was an error - we can skip it if original read request was completely satisfied
// NOTE not preserving EOF at ends - not required per ReaderAt interface
return nhead + nn + ntail, nil
}
// utilities:
// len and cap as int64 (we frequently need them and int64 is covering int so
// the conversion is not lossy)
func len64(b []byte) int64 { return int64(len(b)) }
func cap64(b []byte) int64 { return int64(cap(b)) }
// min/max
func min64(a, b int64) int64 { if a < b { return a } else { return b} }
func max64(a, b int64) int64 { if a > b { return a } else { return b} }
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xbufio
import (
"bytes"
"errors"
"io"
"testing"
)
// XReader is an io.ReaderAt that reads first 256 bytes with content_i = i.
// bytes in range [100, 104] give EIO on reading.
type XReader struct {
}
var EIO = errors.New("input/output error")
func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) {
for n < len(p) && pos < 0x100 {
if 100 <= pos && pos <= 104 {
err = EIO
break
}
p[n] = byte(pos)
n++
pos++
}
if n < len(p) && err == nil {
err = io.EOF
}
return n, err
}
// read @pos/len -> rb.pos, len(rb.buf)
var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
{40, 5, 40, 10}, // 1st access, forward by default
{45, 7, 50, 10}, // part taken from buf, part read next, forward (trend)
{52, 5, 50, 10}, // everything taken from buf
{57, 5, 60, 10}, // part taken from buf, part read next (trend)
{60, 11, 60, 10}, // access > cap(buf), buf skipped
{71, 11, 60, 10}, // access > cap(buf), once again
{82, 10, 82, 10}, // access = cap(buf), should refill buf
{92, 5, 92, 8}, // next access - should refill buffer (trend), but only up to EIO range
{97, 4, 100, 0}, // this triggers user-visible EIO, buffer scratched
{101, 5, 101, 0}, // EIO again
{105, 5, 105, 10}, // past EIO range - buffer refilled
{88, 8, 88, 10}, // go back again a bit before EIO range
{96, 6, 98, 2}, // this uses both buffered data + result of next read which hits EIO
{110,70, 98, 2}, // very big access forward, buf untouched
{180,70, 98, 2}, // big access ~ forward
{170, 5, 170, 10}, // backward: buffer refilled forward because prev backward reading was below
{168, 4, 160, 10}, // backward: buffer refilled backward
{162, 6, 160, 10}, // backward: all data read from buffer
{150,12, 160, 10}, // big backward: buf untouched
{142, 6, 140, 10}, // backward: buffer refilled up to posLastIO
{130,12, 140, 10}, // big backward: buf untouched
{122, 9, 121, 10}, // backward overlapping with last bigio: buf correctly refilled
{131, 9, 131, 10}, // forward after backward: buf refilled forward
{122, 6, 121, 10}, // backward after forward: buf refilled backward
{131, 9, 131, 10}, // forward again
{136,20, 131, 10}, // big forward starting from inside filled buf
{128, 4, 126, 10}, // backward (not trend): buf refilled up to posLastIO
// interleaved backward + fwd-fwd-fwd
{200,10, 200, 10}, // reset @200
{194, 1, 190, 10}, // 1st backward access: buf refilled
{186, 1, 184, 10}, // trendy backward access - buf refilled up-to prev back read
{187, 1, 184, 10}, // fwd-fwd-fwd (all already buffered)
{188, 2, 184, 10},
{190, 3, 184, 10},
{182, 4, 174, 10}, // trendy backward access - part taken from buffer and buf refilled adjacent to previous backward IO
{168, 1, 168, 10}, // trendy backward access farther than cap(buf) - buf refilled right at @pos
{169, 7, 168, 10}, // fwd-fwd-fwd (partly buffered / partly loaded)
{176, 3, 178, 10},
{179, 6, 178, 10},
// interleaved forward + back-back-back
{200,10, 200, 10}, // reset @200
{206, 1, 200, 10}, // 1st forward access
{214, 1, 207, 10}, // trendy forward access - buf refilled adjacent to previous forward read
{213, 1, 207, 10}, // back-back-back (all already buffered)
{211, 2, 207, 10},
{207, 5, 207, 10},
{215, 4, 217, 10}, // trendy forward access - part taken from buffer and buf refilled adjacent to previous forward IO
{235, 1, 235, 10}, // trendy forward access farther than cap(buf) - buf refilled right at @pos
{234, 1, 225, 10}, // back-back-back (partly loaded / then partly buffered)
{230, 3, 225, 10},
{222, 8, 215, 10},
{219, 3, 215, 10},
// backward (non trend) vs not overlapping previous forward
{230,10, 230, 10}, // forward @230 (1)
{220,10, 220, 10}, // backward @220 (2)
{250, 4, 250, 6}, // forward @250 (3)
{245, 5, 240, 10}, // backward @245 (4)
{5, 4, 5, 10}, // forward near file start
{2, 3, 0, 10}, // backward: buf does not go beyond 0
{40, 0, 0, 10}, // zero-sized out-of-buffer read do not change buffer
// backward (not trend) vs EIO
{108,10, 108, 10}, // reset @108
{ 98, 1, 98, 2}, // backward not overlapping EIO: buf filled < EIO range
{108,10, 108, 10}, // reset @108
{ 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 6, 98, 2}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{100, 4, 98, 2}, // backward = EIO range: buf filled < EIO range, EIO -> user
{110,10, 110, 10}, // reset @110
{101, 2, 100, 0}, // backward inside EIO range: buf scratched, EIO -> user
{110,10, 110, 10}, // reset @110
{103, 5, 100, 0}, // backward overlapping tail EIO: buf scratched, EIO -> user
{110,10, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
// backward (trend) vs EIO
// NOTE this is reverse of `backward (not trend) vs EIO
{110,10, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
{110,10, 110, 10}, // reset @110
{103, 5, 98, 2}, // backward overlapping tail EIO: buf scratched (XXX), EIO -> user
{110,10, 110, 10}, // reset @110
{101, 2, 93, 7}, // backward inside EIO range: buf scratched (XXX), EIO -> user
{108,10, 108, 10}, // reset @108
{100, 4, 94, 6}, // backward = EIO range: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 6, 95, 5}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 98, 1, 89, 10}, // backward not overlapping EIO: buf filled according to backward trend
// forward (trend) vs EIO
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{ 98, 1, 98, 2}, // forward not overlapping EIO: buf filled < EIO range
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{ 99, 4, 98, 2}, // forward overlapping head EIO: buf filled < EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{ 99, 6, 98, 2}, // forward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{100, 4, 98, 2}, // forward = EIO range: buf filled < EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 90,10, 90, 10}, // reset forward @100
{101, 2, 100, 0}, // forward inside EIO range: buf scratched, EIO -> user
{ 0, 1, 0, 10},
{ 90,10, 90, 10}, // reset forward @100
{103, 5, 100, 0}, // forward overlapping tail EIO: buf scratched, EIO -> user
{ 0, 1, 0, 10},
{ 90,10, 90, 10}, // reset forward @100
{105, 2, 100, 0}, // forward client after EIO: buf scratched but read request satisfied
{ 0, 1, 0, 10},
{ 90, 5, 90, 10}, // reset forward @95
{ 99, 3, 96, 4}, // forward jump client overlapping head EIO: buf filled < EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 89, 5, 89, 10}, // reset forward @94
{ 98, 2, 95, 5}, // forward jump client reading < EIO: buf filled < EIO range, user request satisfied
// EOF handling
{250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client
{254, 5, 256, 0}, // access overlapping EOF - EOF returned, buf scratched
{256, 1, 256, 0}, // access past EOF -> EOF
{257, 1, 257, 0}, // ----//----
// forward with jumps - buffer is still refilled adjacent to previous reading
// ( because jumps are not sequential access and we are optimizing for sequential cases.
// also: if jump > cap(buf) reading will not be adjacent)
{ 0, 1, 0, 10}, // reset
{ 0, 5, 0, 10},
{ 9, 3, 6, 10},
{20, 3, 20, 10},
}
func TestSeqReaderAt(t *testing.T) {
r := &XReader{}
rb := NewSeqReaderAtSize(r, 10) // with 10 it is easier to do/check math for a human
for _, tt := range xSeqBufTestv {
pOk := make([]byte, tt.Len)
pB := make([]byte, tt.Len)
nOk, errOk := r.ReadAt(pOk, tt.pos)
nB, errB := rb.ReadAt(pB, tt.pos)
pOk = pOk[:nOk]
pB = pB[:nB]
// check that reading result is the same
if !(bytes.Equal(pB, pOk) && errB == errOk) {
t.Fatalf("%v: -> %v, %#v ; want %v, %#v", tt, pB, errB, pOk, errOk)
}
// verify buffer state
if !(rb.pos == tt.bufPos && len(rb.buf) == tt.bufLen){
t.Fatalf("%v: -> unexpected buffer state @%v #%v", tt, rb.pos, len(rb.buf))
}
}
}
// this is benchmark for how thin wrapper is, not for logic inside it
func BenchmarkSeqReaderAt(b *testing.B) {
r := &XReader{}
rb := NewSeqReaderAtSize(r, 10) // same as in TestSeqReaderAt
buf := make([]byte, 128 /* > all .Len in xSeqBufTestv */)
for i := 0; i < b.N; i++ {
for _, tt := range xSeqBufTestv {
rb.ReadAt(buf[:tt.Len], tt.pos)
}
}
}
......@@ -23,7 +23,7 @@ import (
"io"
"os"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/go123/xbufio"
)
// noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment