Commit 1c4409fb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4c4ce2ea
...@@ -137,10 +137,9 @@ func errf(e ierr, subj, format string, a ...interface{}) error { ...@@ -137,10 +137,9 @@ func errf(e ierr, subj, format string, a ...interface{}) error {
return e.err(subj, fmt.Errorf(format, a...)) return e.err(subj, fmt.Errorf(format, a...))
} }
// decodeErr is syntactic shortcut for errf("decode", ...) // checkErr is syntactic shortcut for errf("check", ...)
// TODO in many places "decode" -> "selfcheck" func checkErr(e ierr, format string, a ...interface{}) error {
func decodeErr(e ierr, format string, a ...interface{}) error { return errf(e, "check", format, a...)
return errf(e, "decode", format, a...)
} }
// bug panics with errf("bug", ...) // bug panics with errf("bug", ...)
...@@ -167,7 +166,8 @@ func (fh *FileHeader) Load(r io.ReaderAt) error { ...@@ -167,7 +166,8 @@ func (fh *FileHeader) Load(r io.ReaderAt) error {
// --- Transaction record --- // --- Transaction record ---
// HeaderLen returns whole transaction header length including variable part. // HeaderLen returns whole transaction header length including its variable part.
//
// NOTE: data records start right after transaction header. // NOTE: data records start right after transaction header.
func (txnh *TxnHeader) HeaderLen() int64 { func (txnh *TxnHeader) HeaderLen() int64 {
return TxnHeaderFixSize + int64(len(txnh.workMem)) return TxnHeaderFixSize + int64(len(txnh.workMem))
...@@ -209,20 +209,25 @@ const ( ...@@ -209,20 +209,25 @@ const (
LoadNoStrings = 0x01 // do not load user/desc/ext strings LoadNoStrings = 0x01 // do not load user/desc/ext strings
) )
// Load reads and decodes transaction record header. // Load reads and decodes transaction record header @ pos.
//
// Both transaction header starting at pos, and redundant length of previous
// transaction are loaded. The data read is verified for consistency lightly.
//
// No prerequisite requirements are made to previous txnh state.
// //
// pos: points to transaction start // Rules for .Len/.LenPrev returns:
// no prerequisite requirements are made to previous txnh state
// TODO describe what happens at EOF and when .LenPrev is still valid
// //
// rules for Len/LenPrev returns: // .Len == 0 transaction header could not be read
// Len == 0 transaction header could not be read // .Len == -1 EOF forward
// Len == -1 EOF forward // .Len >= TxnHeaderFixSize transaction was read normally
// Len >= TxnHeaderFixSize transaction was read normally
// //
// LenPrev == 0 prev record length could not be read // .LenPrev == 0 prev record length could not be read
// LenPrev == -1 EOF backward // .LenPrev == -1 EOF backward
// LenPrev >= TxnHeaderFixSize LenPrev was read/checked normally // .LenPrev >= TxnHeaderFixSize LenPrev was read/checked normally
//
// For example when pos points to the end of file .Len will be returned = -1, but
// .LenPrev will be usually valid if file has at least 1 transaction.
func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error { func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error {
if cap(txnh.workMem) < txnXHeaderFixSize { if cap(txnh.workMem) < txnXHeaderFixSize {
txnh.workMem = make([]byte, txnXHeaderFixSize, 256 /* to later avoid allocation for typical strings */) txnh.workMem = make([]byte, txnXHeaderFixSize, 256 /* to later avoid allocation for typical strings */)
...@@ -248,14 +253,14 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -248,14 +253,14 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
if n >= 0 { if n >= 0 {
lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:])) lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
if lenPrev < TxnHeaderFixSize { if lenPrev < TxnHeaderFixSize {
return decodeErr(txnh, "invalid prev record length: %v", lenPrev) return checkErr(txnh, "invalid prev record length: %v", lenPrev)
} }
posPrev := txnh.Pos - lenPrev posPrev := txnh.Pos - lenPrev
if posPrev < txnValidFrom { if posPrev < txnValidFrom {
return decodeErr(txnh, "prev record length goes beyond valid area: %v", lenPrev) return checkErr(txnh, "prev record length goes beyond valid area: %v", lenPrev)
} }
if posPrev < txnValidFrom + TxnHeaderFixSize && posPrev != txnValidFrom { if posPrev < txnValidFrom + TxnHeaderFixSize && posPrev != txnValidFrom {
return decodeErr(txnh, "prev record does not land exactly at valid area start: %v", posPrev) return checkErr(txnh, "prev record does not land exactly at valid area start: %v", posPrev)
} }
txnh.LenPrev = lenPrev txnh.LenPrev = lenPrev
} }
...@@ -279,19 +284,19 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -279,19 +284,19 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:])) txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:]))
if !txnh.Tid.Valid() { if !txnh.Tid.Valid() {
return decodeErr(txnh, "invalid tid: %v", txnh.Tid) return checkErr(txnh, "invalid tid: %v", txnh.Tid)
} }
tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:])) tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:]))
if tlen < TxnHeaderFixSize { if tlen < TxnHeaderFixSize {
return decodeErr(txnh, "invalid txn record length: %v", tlen) return checkErr(txnh, "invalid txn record length: %v", tlen)
} }
// XXX also check tlen to not go beyond file size ? // XXX also check tlen to not go beyond file size ?
txnh.Len = tlen txnh.Len = tlen
txnh.Status = zodb.TxnStatus(work[8+16]) txnh.Status = zodb.TxnStatus(work[8+16])
if !txnh.Status.Valid() { if !txnh.Status.Valid() {
return decodeErr(txnh, "invalid status: %v", txnh.Status) return checkErr(txnh, "invalid status: %v", txnh.Status)
} }
...@@ -301,7 +306,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -301,7 +306,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
lstr := int(luser) + int(ldesc) + int(lext) lstr := int(luser) + int(ldesc) + int(lext)
if TxnHeaderFixSize + int64(lstr) + 8 > txnh.Len { if TxnHeaderFixSize + int64(lstr) + 8 > txnh.Len {
return decodeErr(txnh, "strings overlap with txn boundary: %v / %v", lstr, txnh.Len) return checkErr(txnh, "strings overlap with txn boundary: %v / %v", lstr, txnh.Len)
} }
// NOTE we encode whole strings length into len(.workMem) // NOTE we encode whole strings length into len(.workMem)
...@@ -387,12 +392,12 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -387,12 +392,12 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
} }
if txnh.Len != lenPrev { if txnh.Len != lenPrev {
return decodeErr(txnh, "head/tail lengths mismatch: %v, %v", txnh.Len, lenPrev) return checkErr(txnh, "head/tail lengths mismatch: %v, %v", txnh.Len, lenPrev)
} }
// check tid↓ if we had txnh for "cur" loaded // check tid↓ if we had txnh for "cur" loaded
if lenCur > 0 && txnh.Tid >= tidCur { if lenCur > 0 && txnh.Tid >= tidCur {
return decodeErr(txnh, "tid monitonity broken: %v ; next: %v", txnh.Tid, tidCur) return checkErr(txnh, "tid monitonity broken: %v ; next: %v", txnh.Tid, tidCur)
} }
return nil return nil
...@@ -426,7 +431,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -426,7 +431,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// NOTE also: err could be EOF // NOTE also: err could be EOF
if txnh.LenPrev != 0 && txnh.LenPrev != lenCur { if txnh.LenPrev != 0 && txnh.LenPrev != lenCur {
t := &TxnHeader{Pos: posCur} // txn for which we discovered problem t := &TxnHeader{Pos: posCur} // txn for which we discovered problem
return decodeErr(t, "head/tail lengths mismatch: %v, %v", lenCur, txnh.LenPrev) return checkErr(t, "head/tail lengths mismatch: %v, %v", lenCur, txnh.LenPrev)
} }
if err != nil { if err != nil {
...@@ -435,7 +440,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -435,7 +440,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// check tid↑ // check tid↑
if txnh.Tid <= tidCur { if txnh.Tid <= tidCur {
return decodeErr(txnh, "tid monotonity broken: %v ; prev: %v", txnh.Tid, tidCur) return checkErr(txnh, "tid monotonity broken: %v ; prev: %v", txnh.Tid, tidCur)
} }
return nil return nil
...@@ -456,6 +461,7 @@ func (dh *DataHeader) Len() int64 { ...@@ -456,6 +461,7 @@ func (dh *DataHeader) Len() int64 {
// Load reads and decodes data record header. // Load reads and decodes data record header.
//
// pos: points to data header start // pos: points to data header start
// no prerequisite requirements are made to previous dh state // no prerequisite requirements are made to previous dh state
func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
...@@ -475,36 +481,36 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { ...@@ -475,36 +481,36 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
dh.Oid = zodb.Oid(binary.BigEndian.Uint64(dh.workMem[0:])) // XXX -> zodb.Oid.Decode() ? dh.Oid = zodb.Oid(binary.BigEndian.Uint64(dh.workMem[0:])) // XXX -> zodb.Oid.Decode() ?
dh.Tid = zodb.Tid(binary.BigEndian.Uint64(dh.workMem[8:])) // XXX -> zodb.Tid.Decode() ? dh.Tid = zodb.Tid(binary.BigEndian.Uint64(dh.workMem[8:])) // XXX -> zodb.Tid.Decode() ?
if !dh.Tid.Valid() { if !dh.Tid.Valid() {
return decodeErr(dh, "invalid tid: %v", dh.Tid) return checkErr(dh, "invalid tid: %v", dh.Tid)
} }
dh.PrevRevPos = int64(binary.BigEndian.Uint64(dh.workMem[16:])) dh.PrevRevPos = int64(binary.BigEndian.Uint64(dh.workMem[16:]))
dh.TxnPos = int64(binary.BigEndian.Uint64(dh.workMem[24:])) dh.TxnPos = int64(binary.BigEndian.Uint64(dh.workMem[24:]))
if dh.TxnPos < txnValidFrom { if dh.TxnPos < txnValidFrom {
return decodeErr(dh, "invalid txn position: %v", dh.TxnPos) return checkErr(dh, "invalid txn position: %v", dh.TxnPos)
} }
if dh.TxnPos + TxnHeaderFixSize > pos { if dh.TxnPos + TxnHeaderFixSize > pos {
return decodeErr(dh, "txn position not decreasing: %v", dh.TxnPos) return checkErr(dh, "txn position not decreasing: %v", dh.TxnPos)
} }
if dh.PrevRevPos != 0 { // zero means there is no previous revision if dh.PrevRevPos != 0 { // zero means there is no previous revision
if dh.PrevRevPos < dataValidFrom { if dh.PrevRevPos < dataValidFrom {
return decodeErr(dh, "invalid prev revision position: %v", dh.PrevRevPos) return checkErr(dh, "invalid prev revision position: %v", dh.PrevRevPos)
} }
if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 { if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 {
return decodeErr(dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos) return checkErr(dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos)
} }
} }
verlen := binary.BigEndian.Uint16(dh.workMem[32:]) verlen := binary.BigEndian.Uint16(dh.workMem[32:])
if verlen != 0 { if verlen != 0 {
return decodeErr(dh, "non-zero version: #%v", verlen) return checkErr(dh, "non-zero version: #%v", verlen)
} }
dh.DataLen = int64(binary.BigEndian.Uint64(dh.workMem[34:])) dh.DataLen = int64(binary.BigEndian.Uint64(dh.workMem[34:]))
if dh.DataLen < 0 { if dh.DataLen < 0 {
// XXX also check DataLen < max ? // XXX also check DataLen < max ?
return decodeErr(dh, "invalid data len: %v", dh.DataLen) return checkErr(dh, "invalid data len: %v", dh.DataLen)
} }
return nil return nil
...@@ -512,8 +518,8 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { ...@@ -512,8 +518,8 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
// LoadPrevRev reads and decodes previous revision data record header. // LoadPrevRev reads and decodes previous revision data record header.
// //
// prerequisite: dh .Oid .Tid .PrevRevPos are initialized: // prerequisite: dh .Oid .Tid .PrevRevPos are initialized.
// - TODO describe how //
// when there is no previous revision: io.EOF is returned // when there is no previous revision: io.EOF is returned
func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error { func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
if dh.PrevRevPos == 0 { if dh.PrevRevPos == 0 {
...@@ -522,31 +528,30 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error { ...@@ -522,31 +528,30 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
posCur := dh.Pos posCur := dh.Pos
err := dh.loadPrevRev(r) err := dh.loadPrevRev(r, dh.PrevRevPos)
if err != nil { if err != nil {
// data record @...: loading prev rev: data record @...: ... // data record @...: -> (prev rev): data record @...: ...
err = &DataError{posCur, "loading prev rev", err} err = &DataError{posCur, "-> (prev rev)", err}
} }
return err return err
} }
func (dh *DataHeader) loadPrevRev(r io.ReaderAt) error { // worker for LoadPrevRev and LoadBack
func (dh *DataHeader) loadPrevRev(r io.ReaderAt, prevPos int64) error {
oid := dh.Oid oid := dh.Oid
tid := dh.Tid tid := dh.Tid
err := dh.Load(r, dh.PrevRevPos) err := dh.Load(r, prevPos)
if err != nil { if err != nil {
return err return err
} }
if dh.Oid != oid { if dh.Oid != oid {
// XXX vvv valid only if DataError prints oid return checkErr(dh, "oid mismatch: %s -> %s", oid, dh.Oid)
return decodeErr(dh, "oid mismatch")
} }
if dh.Tid >= tid { if dh.Tid >= tid {
// XXX vvv valid only if DataError prints tid return checkErr(dh, "tid not ↓: %s -> %s", tid, dh.Tid)
return decodeErr(dh, "tid mismatch")
} }
return nil return nil
...@@ -567,10 +572,10 @@ func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) { ...@@ -567,10 +572,10 @@ func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
backPos = int64(binary.BigEndian.Uint64(dh.workMem[0:])) backPos = int64(binary.BigEndian.Uint64(dh.workMem[0:]))
if !(backPos == 0 || backPos >= dataValidFrom) { if !(backPos == 0 || backPos >= dataValidFrom) {
return 0, decodeErr(dh, "invalid backpointer: %v", backPos) return 0, checkErr(dh, "invalid backpointer: %v", backPos)
} }
if backPos + DataHeaderSize > dh.TxnPos - 8 { if backPos + DataHeaderSize > dh.TxnPos - 8 {
return 0, decodeErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos) return 0, checkErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
} }
return backPos, nil return backPos, nil
...@@ -578,7 +583,8 @@ func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) { ...@@ -578,7 +583,8 @@ func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
// LoadBack reads and decodes data header for revision linked via back-pointer. // LoadBack reads and decodes data header for revision linked via back-pointer.
// //
// prerequisite: dh XXX .DataLen == 0 // prerequisite: dh is loaded and .DataLen == 0
//
// if link is to zero (means deleted record) io.EOF is returned // if link is to zero (means deleted record) io.EOF is returned
func (dh *DataHeader) LoadBack(r io.ReaderAt) error { func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
backPos, err := dh.LoadBackRef(r) backPos, err := dh.LoadBackRef(r)
...@@ -591,26 +597,11 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error { ...@@ -591,26 +597,11 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
} }
posCur := dh.Pos posCur := dh.Pos
tid := dh.Tid
// TODO compare this with loadPrevRev() way err = dh.loadPrevRev(r, backPos)
err = func() error {
err := dh.Load(r, backPos)
if err != nil { if err != nil {
return err // data record @...: -> (prev rev): data record @...: ...
} err = &DataError{posCur, "-> (back)", err}
// XXX also dh.Oid == oid ?
// but in general back pointer might point to record with different oid
if dh.Tid >= tid {
return decodeErr(dh, "tid not decreasing")
}
return err
}()
if err != nil {
err = &DataError{posCur, "loading back rev", err}
} }
return err return err
...@@ -618,12 +609,13 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error { ...@@ -618,12 +609,13 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
// LoadNext reads and decodes data header for next data record in the same transaction. // LoadNext reads and decodes data header for next data record in the same transaction.
// //
// prerequisite: dh .Pos .DataLen are initialized // prerequisite: dh .Pos .DataLen are initialized.
// when there is no more data records: io.EOF is returned //
// When there is no more data records: io.EOF is returned.
func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error { func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error {
err := dh.loadNext(r, txnh) err := dh.loadNext(r, txnh)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
err = txnh.err("iterating", err) err = txnh.err("-> (iter data)", err)
} }
return err return err
} }
...@@ -640,7 +632,9 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { ...@@ -640,7 +632,9 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
} }
if nextPos + DataHeaderSize > txnTailPos { if nextPos + DataHeaderSize > txnTailPos {
return &DataError{nextPos, "decode", fmt.Errorf("data record header overlaps txn boundary")} // XXX return &DataError{nextPos, "check",
fmt.Errorf("data record header [..., %d] overlaps txn boundary [..., %d)",
nextPos + DataHeaderSize, txnTailPos)}
} }
err := dh.Load(r, nextPos) err := dh.Load(r, nextPos)
...@@ -649,13 +643,14 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { ...@@ -649,13 +643,14 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
} }
if dh.Tid != txnh.Tid { if dh.Tid != txnh.Tid {
return decodeErr(dh, "data.tid != txn.Tid") // XXX return checkErr(dh, "tid mismatch: %s -> %s", txnh.Tid, dh.Tid)
} }
if dh.TxnPos != txnh.Pos { if dh.TxnPos != txnh.Pos {
return decodeErr(dh, "data.txnPos != txn.Pos") // XXX return checkErr(dh, "txn position not pointing back: %d", dh.TxnPos)
} }
if dh.Pos + dh.Len() > txnTailPos { if dh.Pos + dh.Len() > txnTailPos {
return decodeErr(dh, "data record overlaps txn boundary") // XXX return checkErr(dh, "data record [..., %d) overlaps txn boundary [..., %d)",
dh.Pos + dh.Len(), txnTailPos)
} }
return nil return nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment