Commit 9033940f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 15afd3ad
...@@ -157,6 +157,19 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) { ...@@ -157,6 +157,19 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) {
// XXX maintain nhit / nmiss? // XXX maintain nhit / nmiss?
// isErrNoData returns whether an error is due to "there is no such data in
// database", not e.g. some IO loading error
func isErrNoData(err error) bool {
switch err.(type) {
default:
return false
case *zodb.ErrOidMissing:
case *zodb.ErrXidMissing:
}
return true
}
func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.before read consistently // exit with oce locked and cache.before read consistently
...@@ -239,6 +252,9 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -239,6 +252,9 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
c.gcMu.Lock() c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru) rce.inLRU.MoveBefore(&c.lru)
c.gcMu.Unlock() c.gcMu.Unlock()
// XXX for ErrXidMissing xtid needs to be adjusted to what was queried by user
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
...@@ -277,17 +293,10 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -277,17 +293,10 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// if rce & rceNext cover the same range -> drop rce // if rce & rceNext cover the same range -> drop rce
if i + 1 < len(oce.revv) { if i + 1 < len(oce.revv) {
rceNext := oce.revv[i+1] rceNext := oce.revv[i+1]
if rceNext.loaded() && rceNext.err == nil && rceNext.serial < rce.before { if rceNext.loaded() && tryMerge(rce, rceNext, rce, xid.Oid) {
// drop rce // not δsize -= len(rce.data)
oce.deli(i) // tryMerge can change rce.data if consistency is broken
δsize -= len(rce.data) δsize = 0
// verify rce.serial == rceNext.serial
if rce.err == nil && rce.serial != rceNext.serial {
rce.errDB(xid.Oid, "load(<%v) -> %v; load(<%v) -> %v",
rce.before, rce.serial, rceNext.before, rceNext.serial)
}
rce = rceNext rce = rceNext
} }
} }
...@@ -295,16 +304,8 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -295,16 +304,8 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// if rcePrev & rce cover the same range -> drop rcePrev // if rcePrev & rce cover the same range -> drop rcePrev
if i > 0 { if i > 0 {
rcePrev := oce.revv[i-1] rcePrev := oce.revv[i-1]
if rcePrev.loaded() && rce.err == nil && rce.serial < rcePrev.before { if rcePrev.loaded() && tryMerge(rcePrev, rce, rce, xid.Oid) {
// drop rcePrev
oce.deli(i-1)
δsize -= len(rcePrev.data) δsize -= len(rcePrev.data)
// verify rce.serial == rcePrev.serial
if rcePrev.err == nil && rcePrev.serial != rce.serial {
rce.errDB(xid.Oid, "load(<%v) -> %v; load(<%v) -> %v",
rcePrev.before, rcePrev.serial, rce.before, rce.serial)
}
} }
} }
...@@ -322,6 +323,67 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -322,6 +323,67 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
// tryMerge tries to merge rce prev into next
//
// both prev and next must be already loaded.
// prev and next must come adjacent to each other in parent.revv with
// prev.before < next.before .
//
// cur must be one of either prev or next and indicates which rce is current
// and so may be adjusted with consistency check error.
//
// return: true if merging done and thus prev was dropped from parent
//
// must be called with .parent locked
//
// XXX move oid from args to revCacheEntry?
func tryMerge(prev, next, cur *revCacheEntry, oid zodb.Oid) bool {
// can merge if consistent if
// (if merging)
//
// Pok Nok Ns < Pb Ps = Ns
// Pe Nok Ns < Pb Pe != "nodata" (e.g. it was IO loading error for P)
// Pok Ne ---
// Ne Pe (Pe="nodata") = (Ne="nodata")
//
// b - before
// s - serial
// e - error
if next.err == nil && next.serial < prev.before {
// drop prev
prev.parent.del(prev)
// check consistency
switch {
case prev.err == nil && prev.serial != next.serial:
cur.errDB(oid, "load(<%v) -> %v; load(<%v) -> %v",
prev.before, prev.serial, next.before, next.serial)
case prev.err != nil && !isErrNoData(prev.err):
if cur.err == nil {
cur.errDB(oid, "load(<%v) -> %v; load(<%v) -> %v",
prev.before, prev.err, next.before, next.serial)
}
}
return true
}
if prev.err != nil && isErrNoData(prev.err) == isErrNoData(next.err) {
// drop prev
prev.parent.del(prev)
// not checking consistency - error is already there and
// (Pe="nodata") = (Ne="nodata") already indicates prev & next are consistent.
return true
}
return false
}
/* /*
func (c *cache) gc(...) { func (c *cache) gc(...) {
c.lruMu.Lock() c.lruMu.Lock()
......
...@@ -40,13 +40,33 @@ type tOidData struct { ...@@ -40,13 +40,33 @@ type tOidData struct {
data []byte data []byte
} }
var tstor = &tStorage{ func (s *tStorage) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
dataMap: map[zodb.Oid][]tOidData{ tid := xid.Tid
1: { if xid.TidBefore {
{3, []byte("hello")}, tid++ // XXX overflow
{7, []byte("world")}, }
},
}, datav := s.dataMap[xid.Oid]
if datav == nil {
return nil, 0, &zodb.ErrOidMissing{xid.Oid}
}
// find max entry with .serial < tid
n := len(datav)
i := n - 1 - sort.Search(n, func(i int) bool {
return datav[n - 1 - i].serial < tid
})
if i == -1 {
// tid < all .serial - no such transaction
return nil, 0, &zodb.ErrXidMissing{xid}
}
// check we have exact match if it was loadSerial
if xid.TidBefore && datav[i].serial != xid.Tid {
return nil, 0, &zodb.ErrXidMissing{xid}
}
return datav[i].data, datav[i].serial, nil
} }
/* /*
...@@ -75,33 +95,13 @@ type tDataRecord struct { ...@@ -75,33 +95,13 @@ type tDataRecord struct {
} }
*/ */
func (s *tStorage) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) { var tstor = &tStorage{
tid := xid.Tid dataMap: map[zodb.Oid][]tOidData{
if xid.TidBefore { 1: {
tid++ // XXX overflow {3, []byte("hello")},
} {7, []byte("world")},
},
datav := s.dataMap[xid.Oid] },
if datav == nil {
return nil, 0, &zodb.ErrOidMissing{xid.Oid}
}
// find max entry with .serial < tid
n := len(datav)
i := n - 1 - sort.Search(n, func(i int) bool {
return datav[n - 1 - i].serial < tid
})
if i == -1 {
// tid < all .serial - no such transaction
return nil, 0, &zodb.ErrXidMissing{xid}
}
// check we have exact match if it was loadSerial
if xid.TidBefore && datav[i].serial != xid.Tid {
return nil, 0, &zodb.ErrXidMissing{xid}
}
return datav[i].data, datav[i].serial, nil
} }
func TestCache(t *testing.T) { func TestCache(t *testing.T) {
...@@ -115,4 +115,16 @@ func TestCache(t *testing.T) { ...@@ -115,4 +115,16 @@ func TestCache(t *testing.T) {
// merge: rce + rceNext // merge: rce + rceNext
// rcePrev + rce // rcePrev + rce
// rcePrev + (rce + rceNext) // rcePrev + (rce + rceNext)
c := NewCache(tstor)
c.Load(1, <2) -> nil, 0, &zodb.ErrXidMissing
oce1 := c.entryMap[1]
len(oce1.revv) == 1
rce1_2 := oce1.revv[0]
rce1_2.before == 2
rce1_2.serial == 0
rce1_2.err == zodb.ErrXidMissing
c.Load(1, <3) -> nil, 0, zodb.ErrXidMissing
} }
...@@ -130,6 +130,8 @@ type StorageRecordInformation struct { ...@@ -130,6 +130,8 @@ type StorageRecordInformation struct {
type IStorage interface { type IStorage interface {
// XXX add invalidation channel
// StorageName returns storage name // StorageName returns storage name
StorageName() string StorageName() string
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment