Commit ba9987d8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8d4c9f31
......@@ -10,31 +10,10 @@ import (
//"log"
)
// XXX for backward/forward changing workloads (e.g. fstail) there can be cases like:
// --------
// v |
// +-----------+----^-----+
// | H | T |
// +-----------+----------+
// page2 page1
//
// so jumping back from T(ail) to H(ead) will drop buffer for page1 and load it
// with page2, but next forward read till T will need to again access page1 so
// page2 will be dropped, page1 reloaded, then page2 is needed again (for prev
// tail) and is reloaded again.
//
// This can be avoided with keeping 2 buffers (to always be able to compensate
// direction change) but I'm not sure it will not degrade allways forward case
// becuase pressure to cache is increased 2x.
//
// -> if/when we really need it implement 2 buffers approach.
//
// XXX this also can be solved via loading not from posLastIO but from posLastAccess.
// SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access
// Both forward, backward and interleaved forward/backward access patterns are supported XXX
// FIXME access from multiple goroutines? (it is required per io.ReaderAt
// Both forward, backward and interleaved forward/backward access patterns are supported
//
// XXX access from multiple goroutines? (it is required per io.ReaderAt
// interface, but for sequential workloads we do not need it)
// XXX -> xbufio.SeqReader
type SeqBufReader struct {
......@@ -42,13 +21,9 @@ type SeqBufReader struct {
buf []byte
pos int64
// // position of last IO (can be != .pos because large reads are not buffered)
// posLastIO int64
// TODO text
posLastAccess int64
posLastFwdAfter int64
posLastBackward int64
posLastAccess int64 // position of last access request
posLastFwdAfter int64 // position of last forward access request
posLastBackward int64 // position of last backward access request
r io.ReaderAt
}
......@@ -61,7 +36,7 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader {
}
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size)} //, posLastIO: 0}
sb := &SeqBufReader{r: r, buf: make([]byte, 0, size)} // all positions are zero initially
return sb
}
......@@ -89,8 +64,6 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// kernel can copy the same data from pagecache as well, and it will take the same time
// because for data in sb.buf corresponding page in pagecache has high p. to be hot.
//log.Printf("READ [%v, %v)\t#%v", pos, pos + len64(p), len(p))
//sb.posLastIO = pos
// TODO update lastAccess & lastFwd/lastBack
return sb.r.ReadAt(p, pos)
}
......@@ -99,7 +72,7 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// try to satisfy read request via (partly) reading from buffer
// use buffered data: start + forward
// use buffered data: head(p)
if sb.pos <= pos && pos < sb.pos + len64(sb.buf) {
nhead = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
......@@ -116,7 +89,7 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
} else if len(p) == 0 {
return 0, nil
// use buffered data: tail + backward
// use buffered data: tail(p)
} else if posAfter := pos + len64(p);
sb.pos < posAfter && posAfter <= sb.pos + len64(sb.buf) {
// here we know pos < sb.pos
......@@ -140,19 +113,25 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// NOTE len(p) <= cap(sb.buf)
var xpos int64 // position for new IO request
//if pos >= sb.posLastIO {
if pos >= posLastAccess {
// forward
xpos = pos
// if forward trend continues and buffering can be made adjacent to
// previous forward access - shift reading down right to after it.
xLastAfter := posLastFwdAfter + int64(nhead) // XXX comment
if xLastAfter <= xpos && xpos + len64(p) <= xLastAfter + cap64(sb.buf) {
xpos = xLastAfter
xLastFwdAfter := posLastFwdAfter + int64(nhead) // adjusted for already read from buffer
if xLastFwdAfter <= xpos && xpos + len64(p) <= xLastFwdAfter + cap64(sb.buf) {
xpos = xLastFwdAfter
}
// XXX symmetry for "alternatively" in backward case
// NOTE no symmetry handling for "alternatively" in backward case
// because symmetry would be:
//
// ← → ← →
// 3 4 2 1
//
// but buffer for 4 already does not overlap 3 as for
// non-trendy forward reads buffer always grows forward.
} else {
// backward
......@@ -160,44 +139,28 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// if backward trend continues and bufferring would overlap with
// previous backward access - shift reading up right to it.
xLast := posLastBackward - int64(ntail) // XXX comment
if xpos < xLast && xLast < xpos + cap64(sb.buf) {
xpos = max64(xLast, xpos + len64(p)) - cap64(sb.buf)
xLastBackward := posLastBackward - int64(ntail) // adjusted for already read from buffer
if xpos < xLastBackward && xLastBackward < xpos + cap64(sb.buf) {
xpos = max64(xLastBackward, xpos + len64(p)) - cap64(sb.buf)
// XXX recheck do we really need this ? ( was added for {122, 6, 121, 10} )
// XXX alternatively even if backward trend does not continue anymore
// alternatively even if backward trend does not continue anymore
// but if this will overlap with last access (XXX load) range, probably
// it is better (we are optimizing for sequential access) to
// shift loading region down not to overlap.
// shift loading region down not to overlap. example:
//
// ← → ← →
// 2 1 4 3
//
// here we do not want 4'th buffer to overlap with 3
} else if xpos + cap64(sb.buf) > posLastAccess {
xpos = max64(posLastAccess, xpos + len64(p)) - cap64(sb.buf)
}
// don't let reading go beyond start of the file
xpos = max64(xpos, 0)
/*
// by default we want to read forward, even when iterating backward:
// there are frequent jumps backward for reading a record there forward
xpos = pos
// but if this will overlap with last access range, probably
// it is better (we are optimizing for sequential access) to
// shift loading region down not to overlap.
//
// we have to take into account that last and current access regions
// can overlap, if e.g. last access was big non-buffered read.
if xpos + cap64(sb.buf) > sb.posLastIO {
xpos = max64(sb.posLastIO, xpos + len64(p)) - cap64(sb.buf)
// don't let reading go beyond start of the file
xpos = max64(xpos, 0)
}
*/
}
//log.Printf("read [%v, %v)\t#%v", xpos, xpos + cap64(sb.buf), cap(sb.buf))
// sb.posLastIO = xpos
nn, err := sb.r.ReadAt(sb.buf[:cap(sb.buf)], xpos)
// even if there was an error, or data partly read, we cannot retain
......@@ -220,7 +183,6 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// try to satisfy it once again directly
if pos != xpos { // FIXME pos != xpos no longer means backward
//log.Printf("read [%v, %v)\t#%v", pos, pos + len64(p), len(p))
// sb.posLastIO = pos
nn, err = sb.r.ReadAt(p, pos)
if nn < len(p) {
return nn, err
......
......@@ -44,6 +44,7 @@ func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) {
// read @pos/len -> rb.pos, len(rb.buf)
var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
// TODO add trend / not trend everywhere
// TODO review
{40, 5, 40, 10}, // 1st access, forward by default
{45, 7, 50, 10}, // part taken from buf, part read next, forward (trend)
{52, 5, 50, 10}, // everything taken from buf
......@@ -108,6 +109,11 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
{222, 8, 215, 10},
{219, 3, 215, 10},
// backward (non trend) vs not overlapping previous forward
{230,10, 230, 10}, // forward @230 (1)
{220,10, 220, 10}, // backward @220 (2)
{250, 4, 250, 6}, // forward @250 (3)
{245, 5, 240, 10}, // backward @245 (4)
{5, 4, 5, 10}, // forward near file start
{2, 3, 0, 10}, // backward: buf does not go beyong 0
......@@ -181,6 +187,7 @@ func TestSeqBufReader(t *testing.T) {
}
}
// this is benchmark for how thin wrapper is, not for logic inside it
func BenchmarkSeqBufReader(b *testing.B) {
r := &XReader{}
rb := NewSeqBufReaderSize(r, 10) // same as in TestSeqBufReader
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment