Commit 9a4646be authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d437810c
...@@ -50,6 +50,9 @@ type oidCacheEntry struct { ...@@ -50,6 +50,9 @@ type oidCacheEntry struct {
// cached revisions in ascending order // cached revisions in ascending order
// [i].serial < [i].before <= [i+1].serial < [i+1].before // [i].serial < [i].before <= [i+1].serial < [i+1].before
// //
// XXX ^^^ .serial = 0 while loading is in progress
// XXX ^^^ .serial = 0 if err != nil
//
// XXX or? // XXX or?
// cached revisions in descending order // cached revisions in descending order
// .before > .serial >= next.before > next.serial ? // .before > .serial >= next.before > next.serial ?
...@@ -61,13 +64,6 @@ type revCacheEntry struct { ...@@ -61,13 +64,6 @@ type revCacheEntry struct {
inLRU listHead // in Cache.lru inLRU listHead // in Cache.lru
parent *oidCacheEntry // oidCacheEntry holding us parent *oidCacheEntry // oidCacheEntry holding us
//sync.Mutex XXX not needed?
// oid revision
// 0 if don't know yet - loadBefore(.before) is in progress and actual
// serial not yet obtained from database.
serial zodb.Tid
// we know that loadBefore(oid, .before) will give this .serial:oid. // we know that loadBefore(oid, .before) will give this .serial:oid.
// //
// this is only what we currently know - not neccessarily covering // this is only what we currently know - not neccessarily covering
...@@ -84,8 +80,9 @@ type revCacheEntry struct { ...@@ -84,8 +80,9 @@ type revCacheEntry struct {
// case when loadBefore with tid > cache.before was called. // case when loadBefore with tid > cache.before was called.
before zodb.Tid before zodb.Tid
// loading result: object data or error // loading result: object (data, serial) or error
data []byte data []byte
serial zodb.Tid
err error err error
ready chan struct{} // closed when loading finished ready chan struct{} // closed when loading finished
...@@ -231,18 +228,23 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -231,18 +228,23 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// entry was not in cache - this goroutine becomes responsible for loading it // entry was not in cache - this goroutine becomes responsible for loading it
data, serial, err := c.stor.Load(xid) data, serial, err := c.stor.Load(xid)
// verify db gives serial < before
if serial >= rce.before { // normailize data/serial if it was error
// XXX err != nil - also check vvv? if err != nil {
// XXX loadSerial? data = nil
errDB(xid.Oid, "load(<%v) -> %v", rce.before, serial) serial = 0
} }
rce.serial = serial rce.serial = serial
rce.data = data rce.data = data
rce.err = err rce.err = err
close(ready) // verify db gives serial < before
if rce.serial >= rce.before {
// XXX loadSerial?
rce.errDB(xid.Oid, "load(<%v) -> %v", rce.before, serial)
}
δsize := len(data) close(ready)
δsize := len(rce.data)
// merge rce with adjacent entries in parent // merge rce with adjacent entries in parent
// ( e.g. loadBefore(3) and loadBefore(4) results in the same data loaded if // ( e.g. loadBefore(3) and loadBefore(4) results in the same data loaded if
...@@ -259,15 +261,15 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -259,15 +261,15 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
if i + 1 < len(oce.revv) { if i + 1 < len(oce.revv) {
rceNext := oce.revv[i+1] rceNext := oce.revv[i+1]
if rceNext.loaded() && rceNext.serial < rce.before { // XXX rceNext.serial=0 ? if rceNext.loaded() && rceNext.serial < rce.before { // XXX rceNext.serial=0 ?
// drop rce
oce.deli(i)
δsize -= len(rce.data)
// verify rce.serial == rceNext.serial // verify rce.serial == rceNext.serial
if rce.serial != rceNext.serial { if rce.serial != rceNext.serial {
// XXX -> where to put? rce.err? rce.errDB(xid.Oid, "load(<%v) -> %v; load(<%v) -> %v", rce.before, rce.serial, rceNext.before, rceNext.serial)
errDB(xid.Oid, "load(<%v) -> %v; load(<%v) -> %v", rce.before, rce.serial, rceNext.before, rceNext.serial)
} }
// drop rce
oce.deli(i)
δsize -= len(rce.data)
rce = rceNext rce = rceNext
} }
} }
...@@ -276,10 +278,11 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -276,10 +278,11 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
if i > 0 { if i > 0 {
rcePrev = oce.revv[i-1] rcePrev = oce.revv[i-1]
if rce.serial < rcePrev.before { if rce.serial < rcePrev.before {
// XXX drop rcePrev here?
// verify rce.serial == rcePrev.serial (if that is ready) // verify rce.serial == rcePrev.serial (if that is ready)
if rcePrev.loaded() && rcePrev.serial != rce.serial { // XXX rcePrev.serial=0 ? if rcePrev.loaded() && rcePrev.serial != rce.serial { // XXX rcePrev.serial=0 ?
// XXX -> where to put? rce.err? rce.errDB(xid.Oid, "load(<%v) -> %v; load(<%v) -> %v", rcePrev.before, rcePrev.serial, rce.before, rce.serial)
errDB(xid.Oid, "load(<%v) -> %v; load(<%v) -> %v", rcePrev.before, rcePrev.serial, rce.before, rce.serial)
} }
// drop rcePrev // drop rcePrev
...@@ -345,3 +348,10 @@ func errDB(oid zodb.Oid, format string, argv ...interface{}) error { ...@@ -345,3 +348,10 @@ func errDB(oid zodb.Oid, format string, argv ...interface{}) error {
// XXX -> separate type? // XXX -> separate type?
return fmt.Errorf("cache: database inconsistency: oid: %v: " + format, oid, ...argv) return fmt.Errorf("cache: database inconsistency: oid: %v: " + format, oid, ...argv)
} }
// errDB marks rce with database inconsistency error
func (rce *revCacheEntry) errDB(oid zodb.Oid, format string, argv ...interface{}) {
rce.err = errDB(oid, format, argv...)
rce.data = nil
rce.serial = 0
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment