Commit 4c602537 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 55d3198d
...@@ -33,7 +33,6 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader { ...@@ -33,7 +33,6 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader {
} }
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
// XXX posLastIO - to which to init ?
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size), posLastIO: 0} sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size), posLastIO: 0}
return sb return sb
} }
...@@ -59,9 +58,9 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -59,9 +58,9 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
var ntail int // #data read from buffer for p tail var ntail int // #data read from buffer for p tail
// try to satisfy read request via (partly) reading from buffer // try to satisfy read request via (partly) reading from buffer
switch {
// use buffered data: start + forward // use buffered data: start + forward
case sb.pos <= pos && pos < sb.pos + len64(sb.buf): if sb.pos <= pos && pos < sb.pos + len64(sb.buf) {
nhead = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:]) nhead = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
// if all was read from buffer - we are done // if all was read from buffer - we are done
...@@ -72,19 +71,19 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -72,19 +71,19 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
p = p[nhead:] p = p[nhead:]
pos += int64(nhead) pos += int64(nhead)
// emptry request (possibly not hitting buffer - do not let it go to real IO path) // empty request (possibly not hitting buffer - do not let it go to real IO path)
// `len(p) != 0` is also needed for backward reading from buffer, so this condition goes before // `len(p) != 0` is also needed for backward reading from buffer, so this condition goes before
case len(p) == 0: } else if len(p) == 0 {
return 0, nil return 0, nil
// use buffered data: tail + backward // use buffered data: tail + backward
case posAfter := pos + len64(p); } else if posAfter := pos + len64(p);
sb.pos < posAfter && posAfter <= sb.pos + len64(sb.buf): sb.pos < posAfter && posAfter <= sb.pos + len64(sb.buf) {
// here we know pos < sb.pos // here we know pos < sb.pos
// //
// proof: consider if pos >= sb.pos. // proof: consider if pos >= sb.pos.
// Then from `pos <= sb.pos + len(sb.buf) - len(p)` above it follow that: // Then from `pos <= sb.pos + len(sb.buf) - len(p)` above it follow that:
// `pos < sb.pos + len(sb.buf)` (NOTE strictly < because if len(p) > 0) // `pos < sb.pos + len(sb.buf)` (NOTE strictly < because len(p) > 0)
// and we come to condition which is used in `start + forward` if // and we come to condition which is used in `start + forward` if
ntail = copy(p[sb.pos - pos:], sb.buf) // NOTE ntail == len(p[sb.pos - pos:]) ntail = copy(p[sb.pos - pos:], sb.buf) // NOTE ntail == len(p[sb.pos - pos:])
...@@ -108,8 +107,8 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -108,8 +107,8 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
} else { } else {
// backward // backward
// by default we want to read forward, even when iterating backward // by default we want to read forward, even when iterating backward:
// (there are frequent jumps backward for reading a record there forward) // there are frequent jumps backward for reading a record there forward
xpos = pos xpos = pos
// but if this will overlap with last access range, probably // but if this will overlap with last access range, probably
...@@ -123,21 +122,15 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -123,21 +122,15 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
} }
// don't let reading go beyond start of the file // don't let reading go beyond start of the file
if xpos < 0 { xpos = max64(xpos, 0)
xpos = 0
}
} }
log.Printf("read [%v, %v)\t#%v", xpos, xpos + cap64(sb.buf), cap(sb.buf)) log.Printf("read [%v, %v)\t#%v", xpos, xpos + cap64(sb.buf), cap(sb.buf))
sb.posLastIO = xpos sb.posLastIO = xpos
nn, err := sb.r.ReadAt(sb.buf[:cap(sb.buf)], xpos) nn, err := sb.r.ReadAt(sb.buf[:cap(sb.buf)], xpos)
// nothing read - just return the error // even if there was an error, or data partly read, we cannot retain
if nn == 0 { // the old buf content as io.ReaderAt can use whole buf as scratch space
return nhead, err
}
// even if there was an error, but data partly read, we remember it in the buffer
sb.pos = xpos sb.pos = xpos
sb.buf = sb.buf[:nn] sb.buf = sb.buf[:nn]
...@@ -147,10 +140,25 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -147,10 +140,25 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// - in case of successful read pos/p lies completely inside sb.pos/sb.buf // - in case of successful read pos/p lies completely inside sb.pos/sb.buf
// copy loaded data from buffer to p // copy loaded data from buffer to p
pBufOffset := pos - xpos // offset corresponding to p in sb.buf XXX naming pBufOffset := pos - xpos // offset corresponding to p in sb.buf
if pBufOffset >= len64(sb.buf) { if pBufOffset >= len64(sb.buf) {
// this can be only when for backward reading there was EIO // this can be only due to some IO error
// before data covering pos/p. Just return the error
// if we know:
// - it was backward reading, and
// - original requst was narrower than buffer
// try to satisfy it once again directly
if pos != xpos {
log.Printf("read [%v, %v)\t#%v", pos, pos + len64(p), len(p))
sb.posLastIO = pos
nn, err = sb.r.ReadAt(p, pos)
if nn < len(p) {
return nn, err
}
return nn + ntail, nil // request fully satisfied - we can ignore error
}
// Just return the error
return nhead, err return nhead, err
} }
nn = copy(p, sb.buf[pBufOffset:]) nn = copy(p, sb.buf[pBufOffset:])
......
...@@ -49,29 +49,47 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { // ...@@ -49,29 +49,47 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { //
{71, 11, 60, 10}, // access > cap(buf), once again {71, 11, 60, 10}, // access > cap(buf), once again
{82, 10, 82, 10}, // access = cap(buf), should refill buf {82, 10, 82, 10}, // access = cap(buf), should refill buf
{92, 5, 92, 8}, // next access - should refill buffer, but only up to EIO range {92, 5, 92, 8}, // next access - should refill buffer, but only up to EIO range
{97, 4, 92, 8}, // this triggers user-visible EIO, buffer not refilled {97, 4, 100, 0}, // this triggers user-visible EIO, buffer scratched
{101, 5, 92, 8}, // EIO again {101, 5, 101, 0}, // EIO again
{105, 5, 105, 10}, // past EIO range - buffer refilled {105, 5, 105, 10}, // past EIO range - buffer refilled
{110,70, 105, 10}, // very big access forward, buf untouched {110,70, 105, 10}, // very big access forward, buf untouched
{180,70, 105, 10}, // big access ~ forward {180,70, 105, 10}, // big access ~ forward
{170,11, 105, 10}, // big access backward
{160,11, 105, 10}, // big access backward, once more
{155, 5, 150, 10}, // access backward - buffer refilled taking last IO into account {172, 5, 170, 10}, // backward: buffer refilled up to posLastIO
// XXX refilled forward first time after big backward readings {168, 4, 160, 10}, // backward: buffer refilled backward
{150, 5, 145, 10}, // next access backward - buffer refilled backward {162, 6, 160, 10}, // backward: all data read from buffer
{143, 7, 135, 10}, // backward once again - buffer refilled backward {150,12, 160, 10}, // big backward: buf untouched
{142, 6, 140, 10}, // backward: buffer refilled up to posLastIO
// TODO backward after not big-backward (after regular forward) {130,12, 140, 10}, // big backward: buf untouched
// TODO backward after big-backward overlapping {122, 9, 121, 10}, // backward overlapping with last bigio: buf correctly refilled
// TODO backward over EIO - check returned n {131, 9, 131, 10}, // forward after backward: buf refilled forward
// TODO backward after forward when posLastAccess is in forward tail {122, 6, 121, 10}, // backward after forward: buf refilled backward
// TODO zero-sized out-of-buffer read do not change buffer {131, 9, 131, 10}, // forward again
{136,20, 131, 10}, // big forward starting from inside filled buf
{128, 4, 126, 10}, // backward: buf refilled up to posLastIO
{40, 0, 126, 10}, // zero-sized out-of-buffer read do not change buffer
// backward vs EIO
{110, 1, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
{110, 1, 110, 10}, // reset @110
{103, 5, 100, 0}, // backward overlapping tail EIO: buf scratched, EIO -> user
{110, 1, 110, 10}, // reset @110
{101, 2, 100, 0}, // backward inside EIO range: buf scratched, EIO -> user
{108, 1, 108, 10}, // reset @108
{100, 4, 98, 2}, // backward = EIO range: buf filled < EIO range, EIO -> user
{108, 1, 108, 10}, // reset @108
{ 99, 6, 98, 2}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108, 1, 108, 10}, // reset @108
{ 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{108, 1, 108, 10}, // reset @108
{ 98, 1, 98, 2}, // nackward not overlapping EIO: buf filled < EIO range
{250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client {250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client
{254, 5, 250, 6}, // access overlapping EOF - EOF returned {254, 5, 256, 0}, // access overlapping EOF - EOF returned, buf scratched
{256, 1, 250, 6}, // access past EOF -> EOF {256, 1, 256, 0}, // access past EOF -> EOF
{257, 1, 250, 6}, // ----//---- {257, 1, 257, 0}, // ----//----
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment