Commit 3dbd5d75 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 78907280
......@@ -411,6 +411,8 @@ func (sk *FileSock) Close() error {
}
// Flush implements nodefs.File to handle close(file) call.
// XXX -> Release?
// XXX -> call dtor on release?
func (f *skFile) Flush() fuse.Status {
err := f.rx.Close()
err2 := f.tx.Close()
......
......@@ -483,16 +483,23 @@ type Watch struct {
fsNode
head *Head // parent head/
idNext int32 // ID for next opened Watcher
}
// /head/watch handle - served by Watcher.
type Watcher struct {
sk *FileSock
id int32 // ID of this /head/watch handle (for debug log)
// established file watchers.
// XXX in-progress - where?
// XXX locking?
fileTab map[*FileWatch]struct{}
// IO
acceptq chan string // (stream, msg) // client-initiated messages go here
rxMu sync.Mutex
rxTab map[uint32]chan msg // client replies go via here
}
// FileWatch represents watching for 1 BigFile.
......@@ -1116,7 +1123,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
return nil
}
// updateWatchers complements readBlk and update watchers of the file after a
// updateWatchers complements readBlk: it updates watchers of the file after a
// block was loaded from ZODB and before block data is returned to kernel.
//
// See "7.2) for all registered client@at watchers ..."
......@@ -1162,7 +1169,7 @@ func (f *BigFile) updateWatchers(blk int64, treepath []zodb.IPersistent, pathRev
*/
}
// uploadBlk complements readBlk and uploads loaded blkdata into OS cache.
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
head := f.head
......@@ -1231,6 +1238,10 @@ retry:
log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}
// -------- notifications to Watcher --------
// XXX WatchFile.Pin(blk, at)
// ---- Watch server ----
......@@ -1239,10 +1250,12 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
// XXX check flags?
w := &Watcher{
sk: NewFileSock(),
id: atomic.AddInt32(&watch.id, +1)
fileTab: make(map[*FileWatch]struct{}),
}
// XXX locking
// XXX del watchTab[w] on w.sk.File.Release
watch.head.watchTab[w] = struct{}{}
go w.serve()
......@@ -1250,8 +1263,58 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
}
// serve serves client originated watch requests.
// XXX serves rx?
func (w *Watcher) serve() {
err := w._serve()
// XXX log error if !close
// XXX locking
delete(w.head.watchTab, w)
}
func (w *Watcher) _serve() (err error) {
defer xerr.Contextf(&err, "watcher %d: serve", w.id)
r := bufio.NewReader(w.sk)
var stream uint64
//w.recvReq()
for {
l, err := r.ReadString('\n') // XXX limit accepted line len not to DOS
if err != nil {
return err // XXX err ctx?
}
fmt.Printf("watch: rx: %q\n", l)
// <stream> ...
var req string
n, err := fmt.Sscanf(l, "%d %s\n", &stream, &req)
if err == nil && n != 2 {
err = fmt.Errorf("invalid frame: %q", l)
}
if err != nil {
return fmt.Errorf("rx: %s", err)
}
reply := (stream % 2 == 0)
// reply to wcfs message
if reply {
w.rxMu.Lock()
rxq := w.rxTab[stream]
w.rxMu.Unlock()
if rxq == nil {
return fmt.Errorf("rx: reply on unexpected stream %d", stream)
}
rxq <- req
// client-initiated message
} else {
fmt.Sscanf(req, "watch %s %s\n", &oid, &ats
}
}
}
......@@ -1574,6 +1637,7 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse
groot.head.zconnMu.Lock()
defer groot.head.zconnMu.Unlock()
// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
gdebug.zheadSockTab[sk] = struct{}{}
return sk.File(), fuse.OK
}
......
......@@ -75,7 +75,7 @@ def teardown_function(f):
# ---- tests ----
# check that zurl does not change from one open to another storage open.
# test that zurl does not change from one open to another storage open.
def test_zurlstable():
for i in range(10):
zstor = testdb.getZODBStorage()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment