Commit 21691ac6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bd200d7a
......@@ -423,9 +423,10 @@ package main
//
// Head: zheadMu.W | zheadMu.R + BigFileDir.fileMu
// Watch: atMu.W | atMu.R + pinnedMu
// zheadMu > Watch.atMu
//
// WatchLink.byfileMu > Watch.mu XXX
// BigFile.watchMu > Watch.mu XXX
// WatchLink.byfileMu > Watch.atMu
// BigFile.watchMu > Watch.atMu
import (
"bufio"
......@@ -574,7 +575,7 @@ type BigFile struct {
// both watches in already "established" state (i.e. initial watch
// request was completed and answered with "ok"), and watches in
// progress of being established are kept here.
watchMu sync.Mutex // XXX use
watchMu sync.RWMutex
watchTab map[*Watch]struct{}
}
......@@ -1416,10 +1417,12 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
blkrevRough := true
wg, ctx := errgroup.WithContext(ctx)
for w := range f.watchTab { // XXX locking (f.watchTab)
f.watchMu.RLock()
for w := range f.watchTab {
w := w
// make sure w.at stays unchanged while we pin the block
// make sure w.at stays unchanged while we prepare and pin the block
w.atMu.RLock()
fmt.Printf("S: read -> pin watchers: w @%s\n", w.at)
......@@ -1456,11 +1459,13 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at) // XXX move into go?
wg.Go(func() error {
// XXX close watcher on any error
defer w.atMu.RUnlock()
// XXX close watcher on any error
return w.pin(ctx, blk, pinrev)
})
}
f.watchMu.RUnlock()
err := wg.Wait()
if err != nil {
panic(err) // XXX
......@@ -1515,10 +1520,14 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
}
}
f := w.file
f.watchMu.Lock()
// at="-" (InvalidTid) means "remove the watch"
if at == zodb.InvalidTid {
delete(wlink.byfile, foid) // XXX locking
delete(w.file.watchTab, w) // XXX locking
delete(f.watchTab, w)
f.watchMu.Unlock()
return nil
}
......@@ -1532,11 +1541,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX we might want to allow going back in history if we need it.
if !(at >= w.at) {
w.atMu.Unlock()
f.watchMu.Unlock()
return fmt.Errorf("going back in history is forbidden")
}
f := w.file
// register w to f early, so that READs going in parallel to us
// preparing and processing initial pins, also send pins to w for read
// blocks. If we don't, we can miss to send pin to w for a freshly read
......@@ -1577,8 +1585,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX register only if watch was created anew, not updated?
w.at = at
// NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together.
f.watchTab[w] = struct{}{} // XXX locking
f.watchTab[w] = struct{}{}
wlink.byfile[foid] = w // XXX locking
f.watchMu.Unlock()
// XXX defer -> unregister watch if error?
......@@ -1614,7 +1623,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
_ = pinNew
}
// downgrade atMu.W -> atMu.R
// downgrade atMu.W -> atMu.R to let other clients to access the file.
// XXX there is no primitive to do Wlock->Rlock atomically, but we are
// ok with that since we prepared eveyrhing to handle simultaneous pins
// from other reads.
......@@ -1698,7 +1707,9 @@ func (wlink *WatchLink) _serve() (err error) {
// unregister all watches created on this wlink
for _, w := range wlink.byfile { // XXX locking
delete(w.file.watchTab, w) // XXX locking
w.file.watchMu.Lock()
delete(w.file.watchTab, w)
w.file.watchMu.Unlock()
}
wlink.byfile = nil
......
......@@ -37,6 +37,7 @@ import sys, os, os.path, subprocess, threading, inspect, traceback, re
from thread import get_ident as gettid
from time import gmtime
from errno import EINVAL
from signal import SIGQUIT, SIGKILL
from golang import go, chan, select, func, defer, default
from golang import context, sync, time
from golang.gcompat import qq
......@@ -207,8 +208,9 @@ class tDB:
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal )
t._closed = chan()
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w")
t._closed = chan()
t._wcfuseaborted = chan()
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w")
go(t._abort_ontimeout, 10*time.second) # NOTE must be >> with_timeout
# ZBigFile(s) scheduled for commit
......@@ -258,6 +260,7 @@ class tDB:
print("-> aborting wcfs fuse connection to unblock ...\n", file=sys.stderr)
t._wcfuseabort.write(b"1\n")
t._wcfuseabort.flush()
t._wcfuseaborted.close()
# close closes test database as well as all tracked files, watch links and wcfs.
# it also prints change history to help developer overview current testcase.
......@@ -269,25 +272,34 @@ class tDB:
# unmount and wait for wcfs to exit
def _():
assert 0 == subprocess.call(["mountpoint", "-q", testmntpt])
subprocess.check_call(["fusermount", "-u", testmntpt])
# XXX errctx "wait wcfs exit"
wg = sync.WorkGroup(timeout())
def _(ctx):
while 1:
if ready(ctx.done()):
raise ctx.err()
ret = t.wc._proc.poll()
if ret is not None:
return
tdelay()
wg.go(_)
wg.wait()
assert 0 != subprocess.call(["mountpoint", "-q", testmntpt])
os.rmdir(testmntpt)
defer(_)
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if procwait_(timeout(), t.wc._proc):
return
print("\nC: wcfs.go does not exit")
print("-> kill -QUIT wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGQUIT)
if procwait_(timeout(), t.wc._proc):
return
print("\nC: wcfs.go does not exit (after SIGQUIT)")
print("-> kill -KILL wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGKILL)
if procwait_(timeout(), t.wc._proc):
return
print("\nC: wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
print("-> nothing we can do...\n")
fail("wcfs.go does not exit even after SIGKILL")
defer(_)
def _():
if not ready(t._wcfuseaborted):
assert 0 == subprocess.call(["mountpoint", "-q", testmntpt])
subprocess.check_call(["fusermount", "-u", testmntpt])
defer(_)
defer(t.dump_history)
for tf in t._files.copy():
......@@ -2019,6 +2031,32 @@ def ready(ch):
return bool(_)
# procwait waits for a process (subprocess.Popen) to terminate.
def procwait(ctx, proc):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if ready(ctx.done()):
raise ctx.err()
ret = proc.poll()
if ret is not None:
return
tdelay()
wg.go(_)
wg.wait()
# procwait_, similarly to procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def procwait_(ctx, proc): # -> ok
try:
procwait(ctx, proc)
except Exception as e:
if e in (context.canceled, context.deadlineExceeded):
return False
raise
return True
# xdefer is like defer, but makes sure exception raised before deferred
# function is called is not lost.
#
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment