Commit 12d448f0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3dbd5d75
...@@ -497,9 +497,9 @@ type Watcher struct { ...@@ -497,9 +497,9 @@ type Watcher struct {
fileTab map[*FileWatch]struct{} fileTab map[*FileWatch]struct{}
// IO // IO
acceptq chan string // (stream, msg) // client-initiated messages go here // acceptq chan string // (stream, msg) // client-initiated messages go here
rxMu sync.Mutex rxMu sync.Mutex
rxTab map[uint32]chan msg // client replies go via here rxTab map[uint32]chan string // client replies go via here
} }
// FileWatch represents watching for 1 BigFile. // FileWatch represents watching for 1 BigFile.
...@@ -1250,7 +1250,7 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St ...@@ -1250,7 +1250,7 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
// XXX check flags? // XXX check flags?
w := &Watcher{ w := &Watcher{
sk: NewFileSock(), sk: NewFileSock(),
id: atomic.AddInt32(&watch.id, +1) id: atomic.AddInt32(&watch.id, +1),
fileTab: make(map[*FileWatch]struct{}), fileTab: make(map[*FileWatch]struct{}),
} }
...@@ -1286,32 +1286,58 @@ func (w *Watcher) _serve() (err error) { ...@@ -1286,32 +1286,58 @@ func (w *Watcher) _serve() (err error) {
fmt.Printf("watch: rx: %q\n", l) fmt.Printf("watch: rx: %q\n", l)
// <stream> ... // <stream> <msg...>
var req string sp := strings.IndexByte(l, ' ')
n, err := fmt.Sscanf(l, "%d %s\n", &stream, &req) if sp == -1 {
if err == nil && n != 2 { // XXX write to peer too? (on which stream? -1?)
err = fmt.Errorf("invalid frame: %q", l) return fmt.Errorf("rx: invalid frame: %q", l)
} }
stream, err := strconv.ParseUint(l[:sp], 10, 64)
if err != nil { if err != nil {
return fmt.Errorf("rx: %s", err) return fmt.Errorf("rx: invalid frame (stream): %q", l)
} }
msg := l[sp+1:]
// reply from client to to wcfs
reply := (stream % 2 == 0) reply := (stream % 2 == 0)
// reply to wcfs message
if reply { if reply {
w.rxMu.Lock() w.rxMu.Lock()
rxq := w.rxTab[stream] rxq := w.rxTab[stream]
delete(w.rxTab, stream)
w.rxMu.Unlock() w.rxMu.Unlock()
if rxq == nil { if rxq == nil {
return fmt.Errorf("rx: reply on unexpected stream %d", stream) return fmt.Errorf("rx %d: reply on unexpected streamd", stream)
}
rxq <- msg
continue
} }
rxq <- req
// client-initiated message // client-initiated request
} else { msg = strings.TrimSuffix(msg, "\n")
fmt.Sscanf(req, "watch %s %s\n", &oid, &ats msgv = strings.Split(msg, " ")
if !(len(msgv) == 3 && msgv[0] != "watch") {
// XXX write to peer too
return fmt.Errorf("rx %d: invalid request", stream)
}
oid, err := zodb.ParseOid(msgv[1])
if err != nil {
return fmt.Errorf("rx %d: bad watch: invalid oid")
}
var at zodb.Tid
switch {
case msgv[2] == "-":
at = zodb.InvalidTid
case strings.HasPrefix(msgv[2], "@"):
at, err = zodb.ParseTid(msgv[2][1:])
default:
err = fmt.Errorf("x") // XXX just anything
}
if err != nil {
return fmt.Errorf("rx %d: bad watch: invalid at")
} }
} }
......
...@@ -408,7 +408,7 @@ class tWatch: ...@@ -408,7 +408,7 @@ class tWatch:
if reply: if reply:
with t._rxmu: with t._rxmu:
assert stream in t._rxtab assert stream in t._rxtab
rxq = t._rxtabs[stream] rxq = t._rxtab.pop(stream)
rxq.send(msg) rxq.send(msg)
else: else:
with t._rxmu: with t._rxmu:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment