Commit 779d42dc authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 471f8ac1
...@@ -4,6 +4,10 @@ ...@@ -4,6 +4,10 @@
package fs1 package fs1
import (
"io"
)
// SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access // SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access
// XXX -> xbufio.SeqReader // XXX -> xbufio.SeqReader
type SeqBufReader struct { type SeqBufReader struct {
...@@ -18,34 +22,58 @@ type SeqBufReader struct { ...@@ -18,34 +22,58 @@ type SeqBufReader struct {
dir int dir int
} }
const defaultSeqBufSize = 8192 // XXX retune - must be <= size(L1d) / 2
func NewSeqBufReader(r io.ReaderAt) *SeqBufReader {
return NewSeqBufReaderSize(r, defaultSeqBufSize)
}
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size), dir: 0}
return sb
}
// readFromBuf reads as much as possible for ReadAt(p, pos) request from buffered data // readFromBuf reads as much as possible for ReadAt(p, pos) request from buffered data
// it returns nread and (p', pos') that was left for real reading to complete // it returns nread and (p', pos') that was left for real reading to complete
// XXX dir? // XXX dir?
func (sb *SeqBufReader) readFromBuf(p []byte, pos int64) (int, []byte, int64) { func (sb *SeqBufReader) readFromBuf(p []byte, pos int64) (int, []byte, int64) {
n := 0
// use buffered data: start + forward // use buffered data: start + forward
if pos >= sb.pos && pos < sb.pos + len(sb.buf) { if pos >= sb.pos && pos < sb.pos + int64(len(sb.buf)) {
copyPos := pos - sb.pos copyPos := pos - sb.pos
n = copy(p, sb.buf[copyPos:]) // NOTE len(p) can be < len(sb[copyPos:]) n = copy(p, sb.buf[copyPos:]) // NOTE len(p) can be < len(sb[copyPos:])
p = p[n:] p = p[n:]
pos += n pos += int64(n)
dir = +1 sb.dir = +1 // XXX recheck
} }
// XXX use buffered data: tail + backward // XXX use buffered data: tail + backward
if pos + len(p) <= sb.pos + len(sb.buf) && pos + len(p) > sb.pos { if pos + int64(len(p)) <= sb.pos + int64(len(sb.buf)) && pos + int64(len(p)) > sb.pos {
// TODO // TODO
dir = -1 sb.dir = -1 // XXX recheck
} }
return n, p, pos return n, p, pos
} }
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { // XXX place
func sign(x int64) int {
switch {
case x > 0:
return +1
case x < 0:
return -1
default:
return 0
}
}
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) {
// if request size > buffer - read data directly // if request size > buffer - read data directly
if len(p) > cap(sb.buf) { if len(p) > cap(sb.buf) {
dir = sign(pos - sb.pos) // XXX recheck sb.dir = sign(pos - sb.pos) // XXX recheck
return sb.r.ReadAt(p, pos) return sb.r.ReadAt(p, pos)
} }
...@@ -57,32 +85,34 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -57,32 +85,34 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
} }
// otherwise we need to refill the buffer. determine range to read by current IO direction. // otherwise we need to refill the buffer. determine range to read by current IO direction.
// XXX recheck .dir handling
var xpos int64
switch { switch {
case dir == 0: case sb.dir == 0:
// we don't know direction yet - usually it is first request. // we don't know direction yet - usually it is first request.
// cover pos/p symmetrically. This way we will give hopefully // cover pos/p symmetrically. This way we will give hopefully
// give enough overlap for next read request to determine // give enough overlap for next read request to determine
// direction. // direction.
xpos = pos - (cap(sb.buf) - len(p)) / 2 xpos = pos - int64(cap(sb.buf) - len(p)) / 2
if xpos < 0 { if xpos < 0 {
xpos = 0 xpos = 0
} }
case dir > 0: case sb.dir > 0:
// forward // forward
xpos = pos xpos = pos
default: default:
// backward // backward
//xpos = max64(pos - cap(sb.buf), 0) //xpos = max64(pos - cap(sb.buf), 0)
xpos = pos - cap(sb.buf) xpos = pos - int64(cap(sb.buf))
if xpos < 0 { if xpos < 0 {
xpos = 0 xpos = 0
} }
} }
buf = sb.buf[:cap(sb.buf)] buf := sb.buf[:cap(sb.buf)]
nn, err = sb.r.ReadAt(buf, xpos) nn, err := sb.r.ReadAt(buf, xpos)
// even if there was an error, e.g. after reading part, we remember data read in buffer // even if there was an error, e.g. after reading part, we remember data read in buffer
// XXX only `if nn > 0` ? // XXX only `if nn > 0` ?
......
// TODO copyright / license
// XXX move -> xbufio
package fs1
import (
"bytes"
"errors"
"io"
"testing"
)
// XReader is an io.ReaderAt that reads first 256 bytes with content_i = i
// bytes in range [100, 104] give EIO on reading
type XReader struct {
}
var EIO = errors.New("input/output error")
func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) {
for n < len(p) && pos < 0x100 {
if pos >= 100 && pos <= 104 {
err = EIO
break
}
p[n] = byte(pos)
n++
pos++
}
if n < len(p) && err == nil {
err = io.EOF
}
return n, err
}
func TestSeqBufReader(t *testing.T) {
r := &XReader{}
rb := NewSeqBufReaderSize(r, 10) // with 10 it is easier to do/check math for a human
// read @pos/len -> rb.pos, len(rb.buf), rb.dir
testv := []struct {pos int64; Len int; bufPos int64; bufLen, bufDir int} {
{40, 5, 38, 10, 0}, // 1st access - symmetrically covered
{45, 5, 48, 10, +1}, // part taken from buf, part read next, forward detected
{50, 5, 48, 10, +1}, // everything taken from buf
{55, 5, 58, 10, +1}, // part taken from buf, part read next
{60, 10, 58, 10, +1}, // access bigger than buf
{70, 10, 58, 10, +1}, // access bigger than buf, once again
// XXX big access going backward - detect dir change
}
for _, tt := range testv {
pOk := make([]byte, tt.Len)
pB := make([]byte, tt.Len)
nOk, errOk := r.ReadAt(pOk, tt.pos)
nB, errB := rb.ReadAt(pB, tt.pos)
// check that reading result is the same
if !(nB == nOk && errB == errOk && bytes.Equal(pB, pOk)) {
t.Fatalf("%v: -> %v, %#v, %v ; want %v, %#v, %v", tt, nB, errB, pB, nOk, errOk, pOk)
}
// verify buffer state
if !(rb.pos == tt.bufPos && len(rb.buf) == tt.bufLen && rb.dir == tt.bufDir) {
t.Fatalf("%v: -> unexpected buffer state @%v #%v %+d", tt, rb.pos, len(rb.buf), rb.dir)
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment