Commit a240c642 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 53b77642
...@@ -252,10 +252,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -252,10 +252,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
c.gcMu.Lock() c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru) rce.inLRU.MoveBefore(&c.lru)
c.gcMu.Unlock() c.gcMu.Unlock()
return rce.data, rce.serial, rce.userErr(xid)
// XXX for ErrXidMissing xtid needs to be adjusted to what was queried by user
return rce.data, rce.serial, rce.err
} }
// entry was not in cache - this goroutine becomes responsible for loading it // entry was not in cache - this goroutine becomes responsible for loading it
...@@ -287,7 +284,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -287,7 +284,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// rce was already dropped by merge / evicted // rce was already dropped by merge / evicted
// (XXX recheck about evicted) // (XXX recheck about evicted)
oce.Unlock() oce.Unlock()
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.userErr(xid)
} }
// if rce & rceNext cover the same range -> drop rce // if rce & rceNext cover the same range -> drop rce
...@@ -320,7 +317,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -320,7 +317,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
} }
c.gcMu.Unlock() c.gcMu.Unlock()
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.userErr(xid)
} }
// tryMerge tries to merge rce prev into next // tryMerge tries to merge rce prev into next
...@@ -345,7 +342,9 @@ func tryMerge(prev, next, cur *revCacheEntry, oid zodb.Oid) bool { ...@@ -345,7 +342,9 @@ func tryMerge(prev, next, cur *revCacheEntry, oid zodb.Oid) bool {
// Pok Nok Ns < Pb Ps = Ns // Pok Nok Ns < Pb Ps = Ns
// Pe Nok Ns < Pb Pe != "nodata" (e.g. it was IO loading error for P) // Pe Nok Ns < Pb Pe != "nodata" (e.g. it was IO loading error for P)
// Pok Ne --- // Pok Ne ---
// Ne Pe (Pe="nodata") = (Ne="nodata") // Ne Pe (Pe="nodata") = (Ne="nodata") -> XXX vs deleteObject?
// -> let deleted object actually read
// -> as special non-error value
// //
// b - before // b - before
// s - serial // s - serial
...@@ -428,6 +427,21 @@ func (rce *revCacheEntry) loaded() bool { ...@@ -428,6 +427,21 @@ func (rce *revCacheEntry) loaded() bool {
} }
} }
// userErr returns error that, if any, needs to be returned to user from Cache.Load
//
// ( ErrXidMissing containts xid for which it is missing. In cache we keep such
// xid with max .before but users need to get ErrXidMissing with their own query )
func (rce *revCacheEntry) userErr(xid zodb.Xid) error {
switch e := rce.err.(type) {
case *zodb.ErrXidMissing:
if e.Xid != xid {
return &zodb.ErrXidMissing{xid}
}
}
return rce.err
}
// revCacheEntry: .inLRU -> . // revCacheEntry: .inLRU -> .
func (h *listHead) rceFromInLRU() (rce *revCacheEntry) { func (h *listHead) rceFromInLRU() (rce *revCacheEntry) {
urce := unsafe.Pointer(uintptr(unsafe.Pointer(h)) - unsafe.Offsetof(rce.inLRU)) urce := unsafe.Pointer(uintptr(unsafe.Pointer(h)) - unsafe.Offsetof(rce.inLRU))
......
...@@ -80,12 +80,20 @@ func (s *tStorage) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) ...@@ -80,12 +80,20 @@ func (s *tStorage) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error)
var tstor = &tStorage{ var tstor = &tStorage{
dataMap: map[zodb.Oid][]tOidData{ dataMap: map[zodb.Oid][]tOidData{
1: { 1: {
{3, []byte("hello")}, {4, []byte("hello")},
{7, []byte("world")}, {7, []byte("world")},
}, },
}, },
} }
func xidlt(oid zodb.Oid, tid zodb.Tid) zodb.Xid {
return zodb.Xid{Oid: oid, XTid: zodb.XTid{Tid: tid, TidBefore: true}}
}
func xideq(oid zodb.Oid, tid zodb.Tid) zodb.Xid {
return zodb.Xid{Oid: oid, XTid: zodb.XTid{Tid: tid, TidBefore: false}}
}
func TestCache(t *testing.T) { func TestCache(t *testing.T) {
// XXX <100 <90 <80 // XXX <100 <90 <80
// q<110 -> a) 110 <= cache.before b) otherwise // q<110 -> a) 110 <= cache.before b) otherwise
...@@ -107,28 +115,39 @@ func TestCache(t *testing.T) { ...@@ -107,28 +115,39 @@ func TestCache(t *testing.T) {
// XXX hack; place=ok? // XXX hack; place=ok?
pretty.CompareConfig.PrintStringers = true pretty.CompareConfig.PrintStringers = true
xid1 := zodb.Xid{Oid: 1, XTid: zodb.XTid{Tid: 2, TidBefore: true}} // XXX vvv -> checkLoad()
data, serial, err := c.Load(xid1) // -> nil, 0, &zodb.ErrXidMissing{1,<2} data, serial, err := c.Load(xidlt(1,3))
ok1(data == nil) ok1(data == nil)
ok1(serial == 0) ok1(serial == 0)
eq(err, &zodb.ErrXidMissing{xid1}) eq(err, &zodb.ErrXidMissing{xidlt(1,3)})
oce1 := c.entryMap[1] oce1 := c.entryMap[1]
ok1(len(oce1.revv) == 1) ok1(len(oce1.revv) == 1)
rce1_b2 := oce1.revv[0] // XXX vvv -> checkRCE ?
ok1(rce1_b2.before == 2) rce1_b3 := oce1.revv[0]
ok1(rce1_b2.serial == 0) ok1(rce1_b3.before == 3)
eq(rce1_b2.err, &zodb.ErrXidMissing{xid1}) // XXX must be 1, ?0 ok1(rce1_b3.serial == 0)
eq(rce1_b3.err, &zodb.ErrXidMissing{xidlt(1,3)}) // XXX must be 1, ?0
xid1_3 := xid1
xid1_3.Tid = 3 data, serial, err = c.Load(xidlt(1,4))
data, serial, err = c.Load(xid1_3) // -> nil, 0, zodb.ErrXidMissing{1,<3} ok1(data == nil)
ok1(serial == 0)
eq(err, &zodb.ErrXidMissing{xidlt(1,4)})
ok1(len(oce1.revv) == 1)
rce1_b4 := oce1.revv[0]
ok1(rce1_b4 != rce1_b3) // rce1_b3 was merged into rce1_b4
ok1(rce1_b4.before == 4)
ok1(rce1_b4.serial == 0)
eq(rce1_b4.err, &zodb.ErrXidMissing{xidlt(1,4)}) // XXX must be 1, ?0
// XXX load <2 -> check it is merged with <4 XXX vs deleteObject?
data, serial, err = c.Load(xidlt(1,2))
ok1(data == nil) ok1(data == nil)
ok1(serial == 0) ok1(serial == 0)
eq(err, &zodb.ErrXidMissing{xid1_3}) fmt.Println(err)
eq(len(oce1.revv), 1) eq(err, &zodb.ErrXidMissing{xidlt(1,2)})
rce1_b3 := oce1.revv[0]
ok1(rce1_b3 != rce1_b2) // rce1_b2 was merged into rce1_b3
} }
type Checker struct { type Checker struct {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment