Commit 3956445e authored by Kirill Smelkov's avatar Kirill Smelkov

xbufio += SeqReaderAt - buffering wrapper for a io.ReaderAt optimized for sequential access

For example in ZODB FileStorage format reader routines are written
working with io.ReaderAt for the following reasons:

- for loads random-access is required,
- there can be several concurrent loads in flight simultaneously.

At the same time various database iterations (APIs additional to load)
use sequential access pattern and can be served by the same record
reading routines. However with them we cannot use e.g. bufio.Reader
because it works with plain io.Reader, not io.ReaderAt.

Here comes SeqReaderAt: it adds a buffer, by default 2·4K, on top of
original io.Reader, automatically detects direction of sequential
access which can be forward, backward, or interleaved forward-backward
patterns, and buffers data accordingly to avoid many syscalls e.g. in
os.File case.
parent eadf5c4a
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xbufio
// buffering for io.ReaderAt optimized for sequential access
import (
"io"
//"log"
)
// SeqReaderAt implements buffering for a io.ReaderAt optimized for sequential access.
//
// Both forward, backward and interleaved forward/backward access patterns are supported
//
// NOTE SeqReaderAt is not safe to use from multiple goroutines concurrently.
// Strictly speaking this goes against io.ReaderAt interface but sequential
// workloads usually mean sequential processing. It would be a pity to
// add mutex for nothing.
type SeqReaderAt struct {
// buffer for data at pos. cap(buf) - whole buffer capacity
buf []byte
pos int64
posLastAccess int64 // position of last access request
posLastFwdAfter int64 // position of last forward access request
posLastBackward int64 // position of last backward access request
r io.ReaderAt
// debug: for ioReadAt tracing
//posLastIO int64
}
const defaultSeqBufSize = 8192
// NewSeqReaderAt wraps r with SeqReaderAt with buffer of default size.
func NewSeqReaderAt(r io.ReaderAt) *SeqReaderAt {
return NewSeqReaderAtSize(r, defaultSeqBufSize)
}
// NewSeqReaderAtSize wraps r with SeqReaderAt with buffer of specified size.
func NewSeqReaderAtSize(r io.ReaderAt, size int) *SeqReaderAt {
sb := &SeqReaderAt{r: r, buf: make([]byte, 0, size)} // all positions are zero initially
return sb
}
// // XXX temp
// func init() {
// log.SetFlags(0)
// }
// debug helper for sb.r.ReadAt
func (sb *SeqReaderAt) ioReadAt(p []byte, pos int64) (int, error) {
/*
verb := "read"
if len(p) > cap(sb.buf) {
verb = "READ"
}
log.Printf("%s\t[%v, %v)\t#%v\tIO%+d", verb, pos, pos + len64(p), len(p), pos - sb.posLastIO)
sb.posLastIO = pos
*/
return sb.r.ReadAt(p, pos)
}
func (sb *SeqReaderAt) ReadAt(p []byte, pos int64) (int, error) {
//log.Printf("access\t[%v, %v)\t#%v\t@%+d", pos, pos + len64(p), len(p), pos - sb.posLastAccess)
// read-in last access positions and update them in *sb with current ones for next read
posLastAccess := sb.posLastAccess
posLastFwdAfter := sb.posLastFwdAfter
posLastBackward := sb.posLastBackward
sb.posLastAccess = pos
if pos >= posLastAccess {
sb.posLastFwdAfter = pos + len64(p)
} else {
sb.posLastBackward = pos
}
// if request size > buffer - read data directly
if len(p) > cap(sb.buf) {
// no copying from sb.buf here at all as if e.g. we could copy from sb.buf, the
// kernel can copy the same data from pagecache as well, and it will take the same time
// because for data in sb.buf corresponding page in pagecache has high p. to be hot.
return sb.ioReadAt(p, pos)
}
var nhead int // #data read from buffer for p head
var ntail int // #data read from buffer for p tail
// try to satisfy read request via (partly) reading from buffer
// use buffered data: head(p)
if sb.pos <= pos && pos < sb.pos + len64(sb.buf) {
nhead = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
// if all was read from buffer - we are done
if nhead == len(p) {
return nhead, nil
}
p = p[nhead:]
pos += int64(nhead)
// empty request (possibly not hitting buffer - do not let it go to real IO path)
// `len(p) != 0` is also needed for backward reading from buffer, so this condition goes before
} else if len(p) == 0 {
return 0, nil
// use buffered data: tail(p)
} else if posAfter := pos + len64(p);
sb.pos < posAfter && posAfter <= sb.pos + len64(sb.buf) {
// here we know pos < sb.pos
//
// proof: consider if pos >= sb.pos.
// Then from `pos <= sb.pos + len(sb.buf) - len(p)` above it follow that:
// `pos < sb.pos + len(sb.buf)` (NOTE strictly < because len(p) > 0)
// and we come to condition which is used in `start + forward` if
ntail = copy(p[sb.pos - pos:], sb.buf) // NOTE ntail == len(p[sb.pos - pos:])
// NOTE no return here: full p read is impossible for backward
// p filling: it would mean `pos = sb.pos` which in turn means
// the condition for forward buf reading must have been triggered.
p = p[:sb.pos - pos]
// pos stays the same
}
// here we need to refill the buffer. determine range to read by current IO direction.
// NOTE len(p) <= cap(sb.buf)
var xpos int64 // position for new IO request
if pos >= posLastAccess {
// forward
xpos = pos
// if forward trend continues and buffering can be made adjacent to
// previous forward access - shift reading down right to after it.
xLastFwdAfter := posLastFwdAfter + int64(nhead) // adjusted for already read from buffer
if xLastFwdAfter <= xpos && xpos + len64(p) <= xLastFwdAfter + cap64(sb.buf) {
xpos = xLastFwdAfter
}
// NOTE no symmetry handling for "alternatively" in backward case
// because symmetry would be:
//
// ← → ← →
// 3 4 2 1
//
// but buffer for 4 already does not overlap 3 as for
// non-trendy forward reads buffer always grows forward.
} else {
// backward
xpos = pos
// if backward trend continues and buffering would overlap with
// previous backward access - shift reading up right to it.
xLastBackward := posLastBackward - int64(ntail) // adjusted for already read from buffer
if xpos < xLastBackward && xLastBackward < xpos + cap64(sb.buf) {
xpos = max64(xLastBackward, xpos + len64(p)) - cap64(sb.buf)
// alternatively even if backward trend does not continue anymore
// but if this will overlap with last access range, probably
// it is better (we are optimizing for sequential access) to
// shift loading region down not to overlap. example:
//
// ← → ← →
// 2 1 4 3
//
// here we do not want 4'th buffer to overlap with 3
} else if xpos + cap64(sb.buf) > posLastAccess {
xpos = max64(posLastAccess, xpos + len64(p)) - cap64(sb.buf)
}
// don't let reading go beyond start of the file
xpos = max64(xpos, 0)
}
nn, err := sb.ioReadAt(sb.buf[:cap(sb.buf)], xpos)
// even if there was an error, or data partly read, we cannot retain
// the old buf content as io.ReaderAt can use whole buf as scratch space
sb.pos = xpos
sb.buf = sb.buf[:nn]
// here we know:
// - some data was read
// - in case of successful read pos/p lies completely inside sb.pos/sb.buf
// copy loaded data from buffer to p
pBufOffset := pos - xpos // offset corresponding to p in sb.buf
if pBufOffset >= len64(sb.buf) {
// this can be only due to some IO error
// if original request was narrower than buffer try to satisfy
// it once again directly
if pos != xpos {
nn, err = sb.ioReadAt(p, pos)
if nn < len(p) {
return nhead + nn, err
}
return nhead + nn + ntail, nil // request fully satisfied - we can ignore error
}
// Just return the error
return nhead, err
}
nn = copy(p, sb.buf[pBufOffset:])
if nn < len(p) {
// some error - do not account tail - we did not get to it
return nhead + nn, err
}
// all ok
// NOTE if there was an error - we can skip it if original read request was completely satisfied
// NOTE not preserving EOF at ends - not required per ReaderAt interface
return nhead + nn + ntail, nil
}
// utilities:
// len and cap as int64 (we frequently need them and int64 is covering int so
// the conversion is not lossy)
func len64(b []byte) int64 { return int64(len(b)) }
func cap64(b []byte) int64 { return int64(cap(b)) }
// min/max
func min64(a, b int64) int64 { if a < b { return a } else { return b} }
func max64(a, b int64) int64 { if a > b { return a } else { return b} }
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xbufio
import (
"bytes"
"errors"
"io"
"testing"
)
// XReader is an io.ReaderAt that reads first 256 bytes with content_i = i.
// bytes in range [100, 104] give EIO on reading.
type XReader struct {
}
var EIO = errors.New("input/output error")
func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) {
for n < len(p) && pos < 0x100 {
if 100 <= pos && pos <= 104 {
err = EIO
break
}
p[n] = byte(pos)
n++
pos++
}
if n < len(p) && err == nil {
err = io.EOF
}
return n, err
}
// read @pos/len -> rb.pos, len(rb.buf)
var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
{40, 5, 40, 10}, // 1st access, forward by default
{45, 7, 50, 10}, // part taken from buf, part read next, forward (trend)
{52, 5, 50, 10}, // everything taken from buf
{57, 5, 60, 10}, // part taken from buf, part read next (trend)
{60, 11, 60, 10}, // access > cap(buf), buf skipped
{71, 11, 60, 10}, // access > cap(buf), once again
{82, 10, 82, 10}, // access = cap(buf), should refill buf
{92, 5, 92, 8}, // next access - should refill buffer (trend), but only up to EIO range
{97, 4, 100, 0}, // this triggers user-visible EIO, buffer scratched
{101, 5, 101, 0}, // EIO again
{105, 5, 105, 10}, // past EIO range - buffer refilled
{88, 8, 88, 10}, // go back again a bit before EIO range
{96, 6, 98, 2}, // this uses both buffered data + result of next read which hits EIO
{110,70, 98, 2}, // very big access forward, buf untouched
{180,70, 98, 2}, // big access ~ forward
{170, 5, 170, 10}, // backward: buffer refilled forward because prev backward reading was below
{168, 4, 160, 10}, // backward: buffer refilled backward
{162, 6, 160, 10}, // backward: all data read from buffer
{150,12, 160, 10}, // big backward: buf untouched
{142, 6, 140, 10}, // backward: buffer refilled up to posLastIO
{130,12, 140, 10}, // big backward: buf untouched
{122, 9, 121, 10}, // backward overlapping with last bigio: buf correctly refilled
{131, 9, 131, 10}, // forward after backward: buf refilled forward
{122, 6, 121, 10}, // backward after forward: buf refilled backward
{131, 9, 131, 10}, // forward again
{136,20, 131, 10}, // big forward starting from inside filled buf
{128, 4, 126, 10}, // backward (not trend): buf refilled up to posLastIO
// interleaved backward + fwd-fwd-fwd
{200,10, 200, 10}, // reset @200
{194, 1, 190, 10}, // 1st backward access: buf refilled
{186, 1, 184, 10}, // trendy backward access - buf refilled up-to prev back read
{187, 1, 184, 10}, // fwd-fwd-fwd (all already buffered)
{188, 2, 184, 10},
{190, 3, 184, 10},
{182, 4, 174, 10}, // trendy backward access - part taken from buffer and buf refilled adjacent to previous backward IO
{168, 1, 168, 10}, // trendy backward access farther than cap(buf) - buf refilled right at @pos
{169, 7, 168, 10}, // fwd-fwd-fwd (partly buffered / partly loaded)
{176, 3, 178, 10},
{179, 6, 178, 10},
// interleaved forward + back-back-back
{200,10, 200, 10}, // reset @200
{206, 1, 200, 10}, // 1st forward access
{214, 1, 207, 10}, // trendy forward access - buf refilled adjacent to previous forward read
{213, 1, 207, 10}, // back-back-back (all already buffered)
{211, 2, 207, 10},
{207, 5, 207, 10},
{215, 4, 217, 10}, // trendy forward access - part taken from buffer and buf refilled adjacent to previous forward IO
{235, 1, 235, 10}, // trendy forward access farther than cap(buf) - buf refilled right at @pos
{234, 1, 225, 10}, // back-back-back (partly loaded / then partly buffered)
{230, 3, 225, 10},
{222, 8, 215, 10},
{219, 3, 215, 10},
// backward (non trend) vs not overlapping previous forward
{230,10, 230, 10}, // forward @230 (1)
{220,10, 220, 10}, // backward @220 (2)
{250, 4, 250, 6}, // forward @250 (3)
{245, 5, 240, 10}, // backward @245 (4)
{5, 4, 5, 10}, // forward near file start
{2, 3, 0, 10}, // backward: buf does not go beyond 0
{40, 0, 0, 10}, // zero-sized out-of-buffer read do not change buffer
// backward (not trend) vs EIO
{108,10, 108, 10}, // reset @108
{ 98, 1, 98, 2}, // backward not overlapping EIO: buf filled < EIO range
{108,10, 108, 10}, // reset @108
{ 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 6, 98, 2}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{100, 4, 98, 2}, // backward = EIO range: buf filled < EIO range, EIO -> user
{110,10, 110, 10}, // reset @110
{101, 2, 100, 0}, // backward inside EIO range: buf scratched, EIO -> user
{110,10, 110, 10}, // reset @110
{103, 5, 100, 0}, // backward overlapping tail EIO: buf scratched, EIO -> user
{110,10, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
// backward (trend) vs EIO
// NOTE this is reverse of `backward (not trend) vs EIO
{110,10, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
{110,10, 110, 10}, // reset @110
{103, 5, 98, 2}, // backward overlapping tail EIO: buf scratched (XXX), EIO -> user
{110,10, 110, 10}, // reset @110
{101, 2, 93, 7}, // backward inside EIO range: buf scratched (XXX), EIO -> user
{108,10, 108, 10}, // reset @108
{100, 4, 94, 6}, // backward = EIO range: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 6, 95, 5}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 98, 1, 89, 10}, // backward not overlapping EIO: buf filled according to backward trend
// forward (trend) vs EIO
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{ 98, 1, 98, 2}, // forward not overlapping EIO: buf filled < EIO range
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{ 99, 4, 98, 2}, // forward overlapping head EIO: buf filled < EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{ 99, 6, 98, 2}, // forward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 88,10, 88, 10}, // reset forward @98
{100, 4, 98, 2}, // forward = EIO range: buf filled < EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 90,10, 90, 10}, // reset forward @100
{101, 2, 100, 0}, // forward inside EIO range: buf scratched, EIO -> user
{ 0, 1, 0, 10},
{ 90,10, 90, 10}, // reset forward @100
{103, 5, 100, 0}, // forward overlapping tail EIO: buf scratched, EIO -> user
{ 0, 1, 0, 10},
{ 90,10, 90, 10}, // reset forward @100
{105, 2, 100, 0}, // forward client after EIO: buf scratched but read request satisfied
{ 0, 1, 0, 10},
{ 90, 5, 90, 10}, // reset forward @95
{ 99, 3, 96, 4}, // forward jump client overlapping head EIO: buf filled < EIO range, EIO -> user
{ 0, 1, 0, 10},
{ 89, 5, 89, 10}, // reset forward @94
{ 98, 2, 95, 5}, // forward jump client reading < EIO: buf filled < EIO range, user request satisfied
// EOF handling
{250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client
{254, 5, 256, 0}, // access overlapping EOF - EOF returned, buf scratched
{256, 1, 256, 0}, // access past EOF -> EOF
{257, 1, 257, 0}, // ----//----
// forward with jumps - buffer is still refilled adjacent to previous reading
// ( because jumps are not sequential access and we are optimizing for sequential cases.
// also: if jump > cap(buf) reading will not be adjacent)
{ 0, 1, 0, 10}, // reset
{ 0, 5, 0, 10},
{ 9, 3, 6, 10},
{20, 3, 20, 10},
}
func TestSeqReaderAt(t *testing.T) {
r := &XReader{}
rb := NewSeqReaderAtSize(r, 10) // with 10 it is easier to do/check math for a human
for _, tt := range xSeqBufTestv {
pOk := make([]byte, tt.Len)
pB := make([]byte, tt.Len)
nOk, errOk := r.ReadAt(pOk, tt.pos)
nB, errB := rb.ReadAt(pB, tt.pos)
pOk = pOk[:nOk]
pB = pB[:nB]
// check that reading result is the same
if !(bytes.Equal(pB, pOk) && errB == errOk) {
t.Fatalf("%v: -> %v, %#v ; want %v, %#v", tt, pB, errB, pOk, errOk)
}
// verify buffer state
if !(rb.pos == tt.bufPos && len(rb.buf) == tt.bufLen){
t.Fatalf("%v: -> unexpected buffer state @%v #%v", tt, rb.pos, len(rb.buf))
}
}
}
// this is benchmark for how thin wrapper is, not for logic inside it
func BenchmarkSeqReaderAt(b *testing.B) {
r := &XReader{}
rb := NewSeqReaderAtSize(r, 10) // same as in TestSeqReaderAt
buf := make([]byte, 128 /* > all .Len in xSeqBufTestv */)
for i := 0; i < b.N; i++ {
for _, tt := range xSeqBufTestv {
rb.ReadAt(buf[:tt.Len], tt.pos)
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment