Commit 5c299054 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6adc53bd
...@@ -512,6 +512,10 @@ type Head struct { ...@@ -512,6 +512,10 @@ type Head struct {
// head/watch opens // head/watch opens
// XXX protected by ... zheadMu ? // XXX protected by ... zheadMu ?
wlinkTab map[*WatchLink]struct{} wlinkTab map[*WatchLink]struct{}
// waiters for zconn.At to become ≥ their at.
hwaitMu sync.Mutex // zheadMu.W | zheadMu.R + hwaitMu
hwait map[hwaiter]struct{} // set{(at, ready)}
} }
// /(head|<rev>)/bigfile/ - served by BigFileDir. // /(head|<rev>)/bigfile/ - served by BigFileDir.
...@@ -520,12 +524,12 @@ type BigFileDir struct { ...@@ -520,12 +524,12 @@ type BigFileDir struct {
head *Head // parent head/ or @<rev>/ head *Head // parent head/ or @<rev>/
// {} oid -> <bigfileX> // {} oid -> <bigfileX>
fileMu sync.Mutex fileMu sync.Mutex // XXX doc zheadMu.W | ... ?
fileTab map[zodb.Oid]*BigFile fileTab map[zodb.Oid]*BigFile
// δ tail of tracked BTree nodes of all BigFiles + -> which file // δ tail of tracked BTree nodes of all BigFiles + -> which file
// (used only for head/, not revX/) // (used only for head/, not revX/)
δFmu sync.RWMutex δFmu sync.RWMutex // XXX doc zheadMu.W | ... ?
δFtail *ΔFtail δFtail *ΔFtail
} }
...@@ -556,7 +560,7 @@ type BigFile struct { ...@@ -556,7 +560,7 @@ type BigFile struct {
// //
// Being a staging area for data to enter OS cache, loading has to be // Being a staging area for data to enter OS cache, loading has to be
// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache. // consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
loadMu sync.Mutex loadMu sync.Mutex // XXX doc zheadMu.W | ... ?
loading map[int64]*blkLoadState // #blk -> {... blkdata} loading map[int64]*blkLoadState // #blk -> {... blkdata}
// watches attached to this file. // watches attached to this file.
...@@ -665,8 +669,8 @@ func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolic ...@@ -665,8 +669,8 @@ func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolic
// -------- zhead lock/wait -------- // -------- zhead lock/wait --------
// XXX needed?
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled // TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
func (head *Head) zheadRLock() { head.zheadMu.RLock() } func (head *Head) zheadRLock() { head.zheadMu.RLock() }
func (head *Head) zheadRUnlock() { head.zheadMu.RUnlock() } func (head *Head) zheadRUnlock() { head.zheadMu.RUnlock() }
func (head *Head) zheadLock() { head.zheadMu.Lock() } func (head *Head) zheadLock() { head.zheadMu.Lock() }
...@@ -924,6 +928,54 @@ retry: ...@@ -924,6 +928,54 @@ retry:
// XXX δFtail.ForgetPast(...) // XXX δFtail.ForgetPast(...)
// XXX for f in δF: f.δtail.ForgetPast(...) // XXX for f in δF: f.δtail.ForgetPast(...)
// notify zhead.At waiters
for w := range head.hwait {
if w.at <= δZ.Tid {
delete(head.hwait, w)
close(w.ready)
}
}
}
// hwaiter represents someone waiting for zhead to become ≥ at.
type hwaiter struct {
at zodb.Tid
ready chan struct{}
}
// zheadWait waits till head.zconn.At becomes ≥ at.
//
// It returns error either if db is down or ctx is canceled. XXX db -> wcfs?
func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "wait zhead ≥ %s", at)
if head.rev != 0 {
panic("must be called only for head/, not @revX/")
}
// check if zhead is already ≥ at
head.zheadMu.RLock()
if head.zconn.At() >= at {
head.zheadMu.RUnlock()
return
}
// no - we have to wait for it
ready := make(chan struct{})
head.hwaitMu.Lock()
head.hwait[hwaiter{at, ready}] = struct{}{}
head.hwaitMu.Unlock()
head.zheadMu.RUnlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-ready:
return nil // ok - zhead.At went ≥ at
}
} }
// invalidateBlk invalidates 1 file block in kernel cache. // invalidateBlk invalidates 1 file block in kernel cache.
...@@ -1430,15 +1482,18 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1430,15 +1482,18 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// check at >= w.at // check at >= w.at
// XXX we might want to allow going back in history if we need it. // XXX we might want to allow going back in history if we need it.
if !(at >= w.at) { if !(at >= w.at) {
return fmt.Errorf("going back ZZ history is forbidden") return fmt.Errorf("going back in history is forbidden")
} }
// XXX locking
err = head.zheadWait(ctx, at)
if err != nil {
return err
}
// XXX locking
headAt := head.zconn.At() headAt := head.zconn.At()
// XXX wait head.zconn.At() ≥ at
// XXX <~> f.δtail.Head() ≥ at (?)
if at < bfdir.δFtail.Tail() { if at < bfdir.δFtail.Tail() {
return fmt.Errorf("too far away back from head/at (@%s); δt = %s", return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
headAt, headAt.Time().Sub(at.Time().Time)) headAt, headAt.Time().Sub(at.Time().Time))
...@@ -2148,6 +2203,7 @@ func main() { ...@@ -2148,6 +2203,7 @@ func main() {
rev: 0, rev: 0,
zconn: zhead, zconn: zhead,
wlinkTab: make(map[*WatchLink]struct{}), wlinkTab: make(map[*WatchLink]struct{}),
hwait: make(map[hwaiter]struct{}),
} }
wnode := &WatchNode{ wnode := &WatchNode{
......
...@@ -34,6 +34,7 @@ from persistent.timestamp import TimeStamp ...@@ -34,6 +34,7 @@ from persistent.timestamp import TimeStamp
from ZODB.utils import z64, u64, p64 from ZODB.utils import z64, u64, p64
import sys, os, os.path, subprocess, threading, inspect, traceback, re import sys, os, os.path, subprocess, threading, inspect, traceback, re
from thread import get_ident as gettid
from time import gmtime from time import gmtime
from errno import EINVAL from errno import EINVAL
from golang import go, chan, select, func, defer, default from golang import go, chan, select, func, defer, default
...@@ -208,6 +209,11 @@ class tDB: ...@@ -208,6 +209,11 @@ class tDB:
t._files = set() t._files = set()
t._wlinks = set() t._wlinks = set()
# ID of the thread which created tDB
# ( transaction plays dirty games with threading.local and we have to
# check the thread is the same when .root is used )
t._maintid = gettid()
# prepare initail objects for test: zfile, nonzfile # prepare initail objects for test: zfile, nonzfile
t.root['!file'] = t.nonzfile = Persistent() t.root['!file'] = t.nonzfile = Persistent()
t.root['zfile'] = t.zfile = ZBigFile(blksize) t.root['zfile'] = t.zfile = ZBigFile(blksize)
...@@ -266,11 +272,15 @@ class tDB: ...@@ -266,11 +272,15 @@ class tDB:
assert changeDelta is not None assert changeDelta is not None
t.change(zf, changeDelta) t.change(zf, changeDelta)
# we'll verify that all changed objects come from the same ZODB connection
zconns = set()
# perform modifications scheduled by change. # perform modifications scheduled by change.
# use !wcfs mode so that we prepare data independently of wcfs code paths. # use !wcfs mode so that we prepare data independently of wcfs code paths.
dF = DF() dF = DF()
for zf, zfDelta in t._changed.items(): for zf, zfDelta in t._changed.items():
dfile = DFile() dfile = DFile()
zconns.add(zf._p_jar)
zfh = zf.fileh_open(_use_wcfs=False) zfh = zf.fileh_open(_use_wcfs=False)
for blk, data in zfDelta.iteritems(): for blk, data in zfDelta.iteritems():
dfile.ddata[blk] = data dfile.ddata[blk] = data
...@@ -279,9 +289,18 @@ class tDB: ...@@ -279,9 +289,18 @@ class tDB:
memcpy(vma, data) memcpy(vma, data)
dF.byfile[zf] = dfile dF.byfile[zf] = dfile
assert len(zconns) in (0, 1) # either nothing to commit or all from the same zconn
if len(zconns) == 1:
zconn = zconns.pop()
root = zconn.root()
else:
# no objects to commit
root = t.root
assert gettid() == t._maintid
# perform the commit. NOTE there is no clean way to retrieve tid of # perform the commit. NOTE there is no clean way to retrieve tid of
# just committed transaction - we use last._p_serial as workaround. # just committed transaction - we use last._p_serial as workaround.
t.root['_last'] = last = Persistent() root['_last'] = last = Persistent()
last._p_changed = 1 last._p_changed = 1
transaction.commit() transaction.commit()
head = tAt(t, last._p_serial) head = tAt(t, last._p_serial)
...@@ -1456,7 +1475,7 @@ def test_wcfs_watch_setup_ahead(): ...@@ -1456,7 +1475,7 @@ def test_wcfs_watch_setup_ahead():
f = t.open(zf) f = t.open(zf)
at1 = t.commit(zf, {2:'c1'}) at1 = t.commit(zf, {2:'c1'})
f.assertData(['','','c1']) f.assertData(['','x','c1']) # NOTE #1 not accessed
wg = sync.WorkGroup(timeout()) wg = sync.WorkGroup(timeout())
dt = 100*time.millisecond dt = 100*time.millisecond
...@@ -1491,7 +1510,7 @@ def test_wcfs_watch_setup_ahead(): ...@@ -1491,7 +1510,7 @@ def test_wcfs_watch_setup_ahead():
time.sleep(10*dt) time.sleep(10*dt)
committing.close() committing.close()
at2 = t.commit(zf, {2:'c2'}) at2 = t.commit(zf, {1:'b2'})
assert tidtime(at2) - tidtime(at1) >= 10*dt assert tidtime(at2) - tidtime(at1) >= 10*dt
wg.go(_) wg.go(_)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment