Commit be850769 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 931eebf5
......@@ -25,6 +25,7 @@ import (
"fmt"
"io"
"math"
"strconv"
"strings"
"sync/atomic"
"syscall"
......@@ -604,6 +605,23 @@ func tidmin(a, b zodb.Tid) zodb.Tid {
// ---- parsing ----
// parseWatchFrame parses line going through /head/watch into (stream, msg)
//
// <stream> <msg...>
func parseWatchFrame(line string) (stream uint64, msg string, err error) {
sp := strings.IndexByte(line, ' ')
if sp == -1 {
return 0, "", fmt.Errorf("invalid frame: %q", line)
}
stream, err = strconv.ParseUint(line[:sp], 10, 64)
if err != nil {
return 0, "", fmt.Errorf("invalid stream")
}
return stream, line[sp+1:], nil
}
// parseWatch parses watch request wcfs received over /head/watch.
//
// watch <file> (@<at>|-)
......
......@@ -388,6 +388,7 @@ package main
// XXX For every ZODB connection a dedicated read-only transaction is maintained.
import (
"bufio"
"context"
"flag"
"fmt"
......@@ -488,8 +489,10 @@ type Watch struct {
// /head/watch handle - served by Watcher.
type Watcher struct {
sk *FileSock
id int32 // ID of this /head/watch handle (for debug log)
sk *FileSock
id int32 // ID of this /head/watch handle (for debug log)
head *Head
// established file watchers.
// XXX in-progress - where?
......@@ -499,7 +502,7 @@ type Watcher struct {
// IO
// acceptq chan string // (stream, msg) // client-initiated messages go here
rxMu sync.Mutex
rxTab map[uint32]chan string // client replies go via here
rxTab map[uint64]chan string // client replies go via here
}
// FileWatch represents watching for 1 BigFile.
......@@ -1266,6 +1269,7 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
// XXX serves rx?
func (w *Watcher) serve() {
err := w._serve()
_ = err
// XXX log error if !close
// XXX close if was not closed?
// XXX locking
......@@ -1274,11 +1278,8 @@ func (w *Watcher) serve() {
func (w *Watcher) _serve() (err error) {
defer xerr.Contextf(&err, "watcher %d: serve", w.id)
r := bufio.NewReader(w.sk)
var stream uint64
//w.recvReq()
for {
l, err := r.ReadString('\n') // XXX limit accepted line len not to DOS
if err != nil {
......@@ -1287,20 +1288,12 @@ func (w *Watcher) _serve() (err error) {
fmt.Printf("watch: rx: %q\n", l)
// <stream> <msg...>
sp := strings.IndexByte(l, ' ')
if sp == -1 {
// XXX write to peer too? (on which stream? -1?)
return fmt.Errorf("rx: invalid frame: %q", l)
}
stream, err := strconv.ParseUint(l[:sp], 10, 64)
stream, msg, err := parseWatchFrame(l)
if err != nil {
return fmt.Errorf("rx: invalid frame (stream): %q", l)
// XXX write to peer too? (on which stream? -1?)
return fmt.Errorf("rx: %s", err)
}
msg := l[sp+1:]
// reply from client to to wcfs
reply := (stream % 2 == 0)
if reply {
......@@ -1317,13 +1310,19 @@ func (w *Watcher) _serve() (err error) {
}
// client-initiated request
oid, at, err = parseWatch(msg)
oid, at, err := parseWatch(msg)
if err != nil {
// XXX write to peer too
return fmt.Errorf("rx %d: %s", err)
}
_ = oid
_ = at
_, err = fmt.Fprintf(w.sk, "%d error TODO\n", stream)
if err != nil {
return err
}
}
}
......
......@@ -615,6 +615,8 @@ def test_wcfs():
# XXX new watch request while previous watch request is in progress (over the same /head/watch handle)
# XXX invalid requests -> wcfs replies with error
def test_wcfs_invproto():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment