Commit a2d8995e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ed116ae2
...@@ -595,31 +595,42 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt) error { ...@@ -595,31 +595,42 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt) error {
return nil return nil
} }
// TODO LoadBackRef // LoadBackRef reads data for the data record and decodes it as backpointer reference.
// prerequisite: dh loaded and .LenData == 0 (data record with back-pointer)
// XXX return backPos=-1 if err?
func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
if dh.DataLen != 0 {
bug(dh, "LoadBack() on non-backpointer data header")
}
_, err = r.ReadAt(dh.workMem[:8], dh.Pos + DataHeaderSize)
if err != nil {
return 0, dh.err("read data", noEOF(err))
}
backPos = int64(binary.BigEndian.Uint64(dh.workMem[0:]))
if !(backPos == 0 || backPos >= dataValidFrom) {
return 0, decodeErr(dh, "invalid backpointer: %v", backPos)
}
if backPos + DataHeaderSize > dh.TxnPos - 8 {
return 0, decodeErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
}
return backPos, nil
}
// LoadBack reads and decodes data header for revision linked via back-pointer. // LoadBack reads and decodes data header for revision linked via back-pointer.
// prerequisite: dh XXX .DataLen == 0 // prerequisite: dh XXX .DataLen == 0
// if link is to zero (means deleted record) io.EOF is returned // if link is to zero (means deleted record) io.EOF is returned
func (dh *DataHeader) LoadBack(r io.ReaderAt) error { func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
if dh.DataLen != 0 { backPos, err := dh.LoadBackRef(r)
bug(dh, "LoadBack() on non-backpointer data header")
}
_, err := r.ReadAt(dh.workMem[:8], dh.Pos + DataHeaderSize)
if err != nil { if err != nil {
return dh.err("read data", noEOF(err)) return err
} }
backPos := int64(binary.BigEndian.Uint64(dh.workMem[0:]))
if backPos == 0 { if backPos == 0 {
return io.EOF // oid was deleted return io.EOF // oid was deleted
} }
if backPos < dataValidFrom {
return decodeErr(dh, "invalid backpointer: %v", backPos)
}
if backPos + DataHeaderSize > dh.TxnPos - 8 {
return decodeErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
}
posCur := dh.Pos posCur := dh.Pos
tid := dh.Tid tid := dh.Tid
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
package fs1tools package fs1tools
import ( import (
"io"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
...@@ -65,7 +67,7 @@ func dump(w io.Writer, path string, d dumper) (err error) { ...@@ -65,7 +67,7 @@ func dump(w io.Writer, path string, d dumper) (err error) {
// buffer for formatting // buffer for formatting
buf := &xfmt.Buffer{} buf := &xfmt.Buffer{}
flushBuf := func() error { flushBuf := func() error {
err := w.Write(buf.Bytes()) _, err := w.Write(buf.Bytes())
buf.Reset() buf.Reset()
return err return err
} }
...@@ -79,8 +81,8 @@ func dump(w io.Writer, path string, d dumper) (err error) { ...@@ -79,8 +81,8 @@ func dump(w io.Writer, path string, d dumper) (err error) {
// TODO d.dumpFileHeader // TODO d.dumpFileHeader
it := fs.IterateRaw(fwd) it := fs.IterateRaw(fs1.IterForward)
for i := 0; ; i++ { for {
err = it.NextTxn(fs1.LoadAll) err = it.NextTxn(fs1.LoadAll)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
...@@ -91,7 +93,7 @@ func dump(w io.Writer, path string, d dumper) (err error) { ...@@ -91,7 +93,7 @@ func dump(w io.Writer, path string, d dumper) (err error) {
d.dumpTxn(buf, it.Txnh) // XXX err d.dumpTxn(buf, it.Txnh) // XXX err
for j := 0; ; j++ { for {
err = it.NextData() err = it.NextData()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
...@@ -116,14 +118,16 @@ func dump(w io.Writer, path string, d dumper) (err error) { ...@@ -116,14 +118,16 @@ func dump(w io.Writer, path string, d dumper) (err error) {
// dumper is internal interface to implement various dumping modes // dumper is internal interface to implement various dumping modes
type dumper interface { type dumper interface {
dumpFileHeader(buf *xfmt.Buffer, *fs1.FileHeader) error dumpFileHeader(*xfmt.Buffer, *fs1.FileHeader) error
dumpTxn(buf *xfmt.Buffer, *fs1.TxnHeader) error dumpTxn(*xfmt.Buffer, *fs1.TxnHeader) error
dumpData(buf *xfmt.Buffer, *fs1.DataHeader) error dumpData(*xfmt.Buffer, *fs1.DataHeader) error
dumpTxnPost(buf *xfmt.Buffer, *fs1.TxnHeader) error dumpTxnPost(*xfmt.Buffer, *fs1.TxnHeader) error
} }
// "normal" dumper // "normal" dumper
type dumper1 struct { type dumper1 struct {
ntxn int // current transaction record #
ndata int // current data record # inside current transaction
} }
func (d *dumper1) dumpFileHeader(buf *xfmt.Buffer, fh *fs1.FileHeader) error { func (d *dumper1) dumpFileHeader(buf *xfmt.Buffer, fh *fs1.FileHeader) error {
...@@ -131,20 +135,24 @@ func (d *dumper1) dumpFileHeader(buf *xfmt.Buffer, fh *fs1.FileHeader) error { ...@@ -131,20 +135,24 @@ func (d *dumper1) dumpFileHeader(buf *xfmt.Buffer, fh *fs1.FileHeader) error {
} }
func (d *dumper1) dumpTxn(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error { func (d *dumper1) dumpTxn(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error {
buf .S("Trans #") .D_f("05", i) .S(" tid=") .V(it.Txnh.Tid) buf .S("Trans #") .D_f("05", d.ntxn) .S(" tid=") .V(txnh.Tid)
buf .S(" time=") .V(it.Txnh.Tid.Time()) .S(" offset=") .D64(it.Txnh.Pos) buf .S(" time=") .V(txnh.Tid.Time()) .S(" offset=") .D64(txnh.Pos)
buf .S("\n status=") .Qpy(it.Txnh.Status) buf .S("\n status=") .Qpy(txnh.Status)
buf .S(" user=") .Qpyb(it.Txnh.User) buf .S(" user=") .Qpyb(txnh.User)
buf .S(" description=") .Qpyb(it.Txnh.Description) .S("\n") buf .S(" description=") .Qpyb(txnh.Description) .S("\n")
d.ntxn++
d.ndata = 0
return nil
} }
func (d *dumper1) dumpData(buf *xfmt.Buffer, dh *fs1.DataHeader) error { func (d *dumper1) dumpData(buf *xfmt.Buffer, dh *fs1.DataHeader) error {
buf .S(" data #") .D_f("05", j) .S(" oid=") .V(it.Datah.Oid) buf .S(" data #") .D_f("05", d.ndata) .S(" oid=") .V(dh.Oid)
if it.Datah.DataLen == 0 { if dh.DataLen == 0 {
buf .S(" class=undo or abort of object creation") buf .S(" class=undo or abort of object creation")
backPos, err := it.Datah.LoadBackRef() backPos, err := dh.LoadBackRef()
if err != nil { if err != nil {
// XXX // XXX
} }
...@@ -154,14 +162,18 @@ func (d *dumper1) dumpData(buf *xfmt.Buffer, dh *fs1.DataHeader) error { ...@@ -154,14 +162,18 @@ func (d *dumper1) dumpData(buf *xfmt.Buffer, dh *fs1.DataHeader) error {
} }
} else { } else {
// XXX Datah.LoadData() // XXX Datah.LoadData()
modname, classname = zodb.GetPickleMetadata(...) // XXX //modname, classname = zodb.GetPickleMetadata(...) // XXX
fullclass = "%s.%s" % (modname, classname) //fullclass = "%s.%s" % (modname, classname)
fullclass = "AAA.BBB" // FIXME stub
buf .S(" size=") .D(len(...)) buf .S(" size=") .D(dh.DataLen)
buf .S(" class=") .S(fullclass) buf .S(" class=") .S(fullclass)
} }
buf .S("\n") buf .S("\n")
d.ndata++
return nil
} }
func (d *dumper1) dumpTxnPost(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error { func (d *dumper1) dumpTxnPost(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error {
...@@ -181,14 +193,14 @@ func (d *dumperVerbose) dumpFileHeader(buf *xfmt.Buffer, fh *fs1.FileHeader) err ...@@ -181,14 +193,14 @@ func (d *dumperVerbose) dumpFileHeader(buf *xfmt.Buffer, fh *fs1.FileHeader) err
func (d *dumperVerbose) dumpTxn(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error { func (d *dumperVerbose) dumpTxn(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error {
buf .S("=" * 60) buf .S("=" * 60)
buf .S("\noffset: ") .D64(it.Txnh.Pos) buf .S("\noffset: ") .D64(txnh.Pos)
buf .S("\nend pos: ") .D64(it.Txnh.Pos + it.Txnh.Len) buf .S("\nend pos: ") .D64(txnh.Pos + txnh.Len)
buf .S("\ntransaction id: ") .V(it.Txnh.Tid) buf .S("\ntransaction id: ") .V(txnh.Tid)
buf .S("\ntrec len: ") .D64(it.Txnh.Len) buf .S("\ntrec len: ") .D64(txnh.Len)
buf .S("\nstatus: ") .Qpy(it.Txnh.Status) buf .S("\nstatus: ") .Qpy(txnh.Status)
buf .S("\nuser: ") .Qpyb(it.Txnh.User) buf .S("\nuser: ") .Qpyb(txnh.User)
buf .S("\ndescription: ") .Qpyb(it.Txnh.Description) buf .S("\ndescription: ") .Qpyb(txnh.Description)
buf .S("\nlen(extra): ") .D(len(it.Txnh.Extension)) buf .S("\nlen(extra): ") .D(len(txnh.Extension))
buf .S("\n") buf .S("\n")
return nil return nil
} }
...@@ -219,6 +231,6 @@ func (d *dumperVerbose) dumpTxnPost(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error ...@@ -219,6 +231,6 @@ func (d *dumperVerbose) dumpTxnPost(buf *xfmt.Buffer, txnh *fs1.TxnHeader) error
// NOTE printing the same .Len twice // NOTE printing the same .Len twice
// we do not print/check redundant len here because our // we do not print/check redundant len here because our
// FileStorage code checks/reports this itself // FileStorage code checks/reports this itself
buf .S("redundant trec len: " .D64(it.Txnh.Len)) .S("\n") buf .S("redundant trec len: " .D64(txnh.Len)) .S("\n")
return nil return nil
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment