Commit 57a61770 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ed7ffaa2
...@@ -171,6 +171,36 @@ func isErrNoData(err error) bool { ...@@ -171,6 +171,36 @@ func isErrNoData(err error) bool {
} }
func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
rce, rceNew := c.lookupRCE(xid)
// rce is already in cache - use it
if !rceNew {
<-rce.ready
c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru)
c.gcMu.Unlock()
// rce is not in cache - this goroutine becomes responsible for loading it
} else {
c.loadRCE(rce, xid)
}
return rce.data, rce.serial, rce.userErr(xid)
}
func (c *Cache) Prefetch(xid zodb.Xid) {
rce, rceNew := c.lookupRCE(xid)
// spawn prefetch in the background if rce was not yet loaded
if rceNew {
go c.loadRCE(rce, xid)
}
}
// lookupRCE returns revCacheEntry corresponding to xid.
// rceNew indicates whether RCE is new and loading on it has not been initiated.
func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.before read consistently // exit with oce locked and cache.before read consistently
c.mu.RLock() c.mu.RLock()
...@@ -196,9 +226,6 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -196,9 +226,6 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
} }
// oce, before -> rce (revCacheEntry) // oce, before -> rce (revCacheEntry)
var rce *revCacheEntry
var rceNew bool // whether we created rce anew
if xid.TidBefore { if xid.TidBefore {
l := len(oce.rcev) l := len(oce.rcev)
i := sort.Search(l, func(i int) bool { i := sort.Search(l, func(i int) bool {
...@@ -245,17 +272,12 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -245,17 +272,12 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
} }
oce.Unlock() oce.Unlock()
return rce, rceNew
}
// entry was already in cache - use it // loadRCE performs data loading from database to RCE
if !rceNew { func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) {
<-rce.ready oce := rce.parent
c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru)
c.gcMu.Unlock()
return rce.data, rce.serial, rce.userErr(xid)
}
// entry was not in cache - this goroutine becomes responsible for loading it
data, serial, err := c.loader.Load(xid) data, serial, err := c.loader.Load(xid)
// normailize data/serial if it was error // normailize data/serial if it was error
...@@ -284,7 +306,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -284,7 +306,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// rce was already dropped by merge / evicted // rce was already dropped by merge / evicted
// (XXX recheck about evicted) // (XXX recheck about evicted)
oce.Unlock() oce.Unlock()
return rce.data, rce.serial, rce.userErr(xid) return
} }
// if rce & rceNext cover the same range -> drop rce // if rce & rceNext cover the same range -> drop rce
...@@ -316,8 +338,6 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -316,8 +338,6 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// XXX -> run gc // XXX -> run gc
} }
c.gcMu.Unlock() c.gcMu.Unlock()
return rce.data, rce.serial, rce.userErr(xid)
} }
// tryMerge tries to merge rce prev into next // tryMerge tries to merge rce prev into next
......
...@@ -241,6 +241,20 @@ func TestCache(t *testing.T) { ...@@ -241,6 +241,20 @@ func TestCache(t *testing.T) {
ok1(rce1_b12 != rce1_b10) ok1(rce1_b12 != rce1_b10)
checkRCE(rce1_b12, 12, 9, world, nil) checkRCE(rce1_b12, 12, 9, world, nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12)
// simulate case where <14 and <16 were loaded in parallel, both are ready
// but <14 takes oce lock first before <16 ans so <12 is not yet merged
// with <16 -> <12 and <14 should be merged into <16.
// (manually add rce1_b16 so it is not merged with <12)
rce1_b16 := oce1.newRevEntry(len(oce1.rcev), 16)
rce1_b16.serial = 9
rce1_b16.data = world
close(rce1_b16.ready) // XXX
ok1(rce1_b16.loaded())
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12, rce1_b16)
// XXX launch load(<14) before <16.ready
} }
type Checker struct { type Checker struct {
......
...@@ -155,8 +155,7 @@ type IStorage interface { ...@@ -155,8 +155,7 @@ type IStorage interface {
// TODO specify error when data not found // TODO specify error when data not found
Load(xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ? Load(xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ?
// -> Prefetch(xid Xid) ... // Prefetch(xid Xid) (no error)
// PrefetchBefore(oidv []Oid, beforeTid Tid) error (?)
// Store(oid Oid, serial Tid, data []byte, txn ITransaction) error // Store(oid Oid, serial Tid, data []byte, txn ITransaction) error
// XXX Restore ? // XXX Restore ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment