Commit 434deb93 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent be850769
......@@ -83,11 +83,11 @@ func err2LogStatus(err error) fuse.Status {
// otherwise log as warnings EINVAL and as errors everything else
switch errors.Cause(err).(type) {
case *eInvalError:
log.Warning(err)
log.WarningDepth(1, err)
return fuse.EINVAL
default:
log.Error(err)
log.ErrorDepth(1, err)
return fuse.EIO
}
}
......@@ -301,6 +301,9 @@ type skFile struct {
//
// After file socket is created, File return should be given to kernel for the
// socket to be connected to an opened file.
//
// Note: the node opening which gives FileSock, should not be "regular" - else
// read/write will be serialized by kernel (git.kernel.org/linus/9c225f2655)
func NewFileSock() *FileSock {
sk := &FileSock{}
f := &skFile{
......
......@@ -1248,12 +1248,26 @@ retry:
// ---- Watch server ----
func (watch *Watch) GetAttr(out *fuse.Attr, f nodefs.File, fctx *fuse.Context) fuse.Status {
st := watch.fsNode.GetAttr(out, f, fctx)
// represent ourself as XXX (FileSock requirement)
// XXX S_IFSOCK does not work (LOOKUP returns inode, open gives: "No such device or address")
// XXX S_IFIFO does not work (the kernel shows the file, but it being
// FIFO makes the data go through kernel pipe, not via FUSE filesystem)
// XXX S_IFLNK - the kernel wants to follow the link
// XXX S_IFDIR - os.open complains "is a directory" (maybe could workaround)
// XXX S_IFCHR - fusermount always adds nodev mount option -> the device cannot be accessed
out.Mode = syscall.S_IFSOCK | 0644
return st
}
// Open serves /head/watch opens.
func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// XXX check flags?
w := &Watcher{
sk: NewFileSock(),
id: atomic.AddInt32(&watch.idNext, +1),
head: watch.head,
fileTab: make(map[*FileWatch]struct{}),
}
......
......@@ -364,7 +364,20 @@ class tWatch:
def __init__(t, tdb):
t.tdb = tdb
t.w = tdb._open("head/watch", mode='rwb')
# python/stdio lock file object on read/write
# however we need both read and write to be working simultaneously
print('\n\n')
os.system("ls -l %s" % tdb.path("head"))
print('\n\n')
#os.system("LANG=C strace stat %s" % tdb.path("head/watch"))
#print('\n\n')
os.system("LANG=C strace cat %s" % tdb.path("head/watch"))
print('\n\n')
t.wh = os.open(tdb.path("head/watch"), os.O_RDWR)
t.wrx = os.fdopen(t.wh, 'rb')
t.wtx = os.fdopen(t.wh, 'wb')
t._acceptq = chan() # (stream, msg) server originated messages go here
t._rxmu = threading.Lock()
......@@ -380,7 +393,7 @@ class tWatch:
def close(t):
t.tdb._tracked.remove(t)
t.w.close()
os.close(t.wh)
t._serveDone.recv()
# wakeup everyone waiting for rx
t._acceptq.close()
......@@ -395,7 +408,7 @@ class tWatch:
def _serveRecv(t):
defer(t._serveDone.close)
while 1:
l = t.w.readline()
l = t.wrx.readline()
print('watch: rx: %r' % l)
if len(l) == 0:
break # closed
......@@ -420,22 +433,35 @@ class tWatch:
#
# multiple _send can be called in parallel - _send serializes writes.
def _send(t, stream, msg):
print('qqq')
assert '\n' not in msg
print('zzz')
with t._txmu:
t.w.write(b"%d %s\n" % (stream, msg))
print('rrr')
a = b"%d %s\n" % (stream, msg)
print(`a`)
t.wtx.write(b"%d %s\n" % (stream, msg)) # XXX read/write don't work in parallel?
print('sss')
t.wtx.flush()
print('ooo')
# sendReq sends client -> server request and returns server reply.
#
# only 1 sendReq must be used at a time. # XXX relax?
def sendReq(t, req):
print("000")
stream = 1
rxq = chan()
print("111")
with t._rxmu:
print("222")
assert stream not in t._rxtab
t._rxtab[stream] = rxq
print("333")
t._send(stream, req)
print("444")
return rxq.recv()
# recvReq receives client <- server request.
......@@ -580,6 +606,7 @@ def test_wcfs():
# >>> XXX commit data to not yet accessed f part - nothing happens
# XXX invalidation protocol ...
print('\n\n')
w = t.openwatch()
......@@ -591,8 +618,15 @@ def test_wcfs():
for p in pinv:
p.ack()
go(_)
assert w.sendReq(b"watch %s @%s" % (h(zf._p_oid), h(at1))) == "ok"
print('\nAAA\n')
try:
assert w.sendReq(b"watch %s @%s" % (h(zf._p_oid), h(at1))) == "ok"
except Exception, e:
print(e)
raise
print('\nBBB\n')
done.recv()
print('\nCCC\n')
# XXX test watch with all at variants
# XXX both from scratch and going e.g. at1 -> at2 -> at3
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment