Commit 2897009e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bf02898b
...@@ -346,6 +346,7 @@ func (f *skFile) Read(dest []byte, /*ignored*/off int64) (fuse.ReadResult, fuse. ...@@ -346,6 +346,7 @@ func (f *skFile) Read(dest []byte, /*ignored*/off int64) (fuse.ReadResult, fuse.
} }
if err == io.EOF { if err == io.EOF {
n = 0 // read(2): "zero indicates end of file" n = 0 // read(2): "zero indicates end of file"
err = nil
} }
if err != nil { if err != nil {
return nil, err2LogStatus(err) return nil, err2LogStatus(err)
......
...@@ -1446,8 +1446,8 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus ...@@ -1446,8 +1446,8 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
// wcfs initiated requests. // wcfs initiated requests.
func (wlink *WatchLink) serveRX() { func (wlink *WatchLink) serveRX() {
err := wlink._serveRX() err := wlink._serveRX()
_ = err
// XXX log error if !close // XXX log error if !close
log.Error(err)
// XXX locking // XXX locking
delete(wlink.head.wlinkTab, wlink) delete(wlink.head.wlinkTab, wlink)
} }
...@@ -1470,8 +1470,8 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1470,8 +1470,8 @@ func (wlink *WatchLink) _serveRX() (err error) {
} }
}() }()
// close .sk on error/wcfs stopping or return. closing .sk wakes up rx on it. // close .sk on error/wcfs stopping or return. closing .sk wakes up rx
// make sure to stop wg on error return. // on it and rx on client side. make sure to stop wg on error return.
retq := make(chan struct{}) retq := make(chan struct{})
defer close(retq) defer close(retq)
wg.Go(func() error { wg.Go(func() error {
...@@ -1504,7 +1504,7 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1504,7 +1504,7 @@ func (wlink *WatchLink) _serveRX() (err error) {
stream, msg, err := parseWatchFrame(l) stream, msg, err := parseWatchFrame(l)
if err != nil { if err != nil {
return fmt.Errorf("%s", err) return err
} }
// reply from client to wcfs // reply from client to wcfs
...@@ -1522,7 +1522,14 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1522,7 +1522,14 @@ func (wlink *WatchLink) _serveRX() (err error) {
continue continue
} }
// client-initiated watch request // client-initiated request
// bye TODO document in "Invalidation protocol"
if msg == "bye" {
return nil // deferred sk.Close will wake-up rx on client side
}
// watch ...
if atomic.LoadInt32(&handlingWatch) != 0 { if atomic.LoadInt32(&handlingWatch) != 0 {
return fmt.Errorf("%d: another watch request is already in progress", stream) return fmt.Errorf("%d: another watch request is already in progress", stream)
} }
......
...@@ -372,19 +372,14 @@ class tWatch: ...@@ -372,19 +372,14 @@ class tWatch:
# python/stdio lock file object on read/write # python/stdio lock file object on read/write
# however we need both read and write to be working simultaneously. # however we need both read and write to be working simultaneously.
# fdopen takes ownership of file descriptor and closes it when file object is closed. #
# fdopen takes ownership of file descriptor and closes it when file
# object is closed -> we dup fd so that each file object has its own fd.
#print('\n\n') wh = os.open(tdb.path("head/watch"), os.O_RDWR)
#os.system("ls -l %s" % tdb.path("head")) wh2 = os.dup(wh) # XXX temp?
#print('\n\n') t.wrx = os.fdopen(wh, 'rb')
#os.system("LANG=C strace stat %s" % tdb.path("head/watch")) t.wtx = os.fdopen(wh2, 'wb')
#print('\n\n')
#os.system("LANG=C strace cat %s" % tdb.path("head/watch"))
#print('\n\n')
t.wh = os.open(tdb.path("head/watch"), os.O_RDWR)
t.wrx = os.fdopen(t.wh, 'rb')
t.wtx = os.fdopen(t.wh, 'wb')
t._acceptq = chan() # (stream, msg) server originated messages go here t._acceptq = chan() # (stream, msg) server originated messages go here
t._rxmu = threading.Lock() t._rxmu = threading.Lock()
...@@ -400,8 +395,12 @@ class tWatch: ...@@ -400,8 +395,12 @@ class tWatch:
def close(t): def close(t):
t.tdb._tracked.remove(t) t.tdb._tracked.remove(t)
os.close(t.wh) t._send(1, b'bye') # ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up _serveRecv
t._serveDone.recv() t._serveDone.recv()
t.wtx.close()
t.wrx.close()
# wakeup everyone waiting for rx # wakeup everyone waiting for rx
t._acceptq.close() t._acceptq.close()
with t._rxmu: with t._rxmu:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment