Commit bdca0e6a authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Han-Wen Nienhuys

Add support for store notify

I'm writing a networked filesystem which reads data in 2MB blocks from
remote database. However read requests from the kernel come in much
smaller sizes - for example 4K-128K.

Since it would be very slow to refetch the 2MB block for e.g. every
consecutive 4K reads, a cache for fetched data is needed. A custom cache
would do, however since the kernel already implements pagecache to cache
file data, it is logical to use it directly.

FUSE protocol provides primitives for pagecache control. We already have
support for e.g. invalidating a data region for inode (InodeNotify), but
there is more there. In particular it is possible to store and
retrieve data into/from the kernel cache with "store notify" and
"retrieve notify" messages:

https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fuse.h?id=v4.19-rc6-177-gcec4de302c5f#n68
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fuse.h?id=v4.19-rc6-177-gcec4de302c5f#n756

https://git.kernel.org/linus/a1d75f2582
https://git.kernel.org/linus/2d45ba381a

This patch adds support for "store notify". Adding support for "retrieve
notify" might be added later since a) it is not needed right now, and b)
supporting it is a bit more work since the kernel sends a separate reply
with data and infrastructure has to be added to glue it back to original
"retrieve notify" server-originated request.

For user-visible API I decided not to duplicate FUSE-protocol naming
1-1 and to be more explicit in names - emphasizing it is about cache
control - it is e.g. InodeNotifyStoreCache instead of
InodeNotifyStore, and for retrieving it (hopefully) should be just
InodeRetrieveCache instead of InodeNotifyRetrieveCache and a
separate callback.

Thanks beforehand,
Kirill
parent 8ccb625b
......@@ -384,6 +384,29 @@ func (c *FileSystemConnector) FileNotify(node *Inode, off int64, length int64) f
return c.server.InodeNotify(nId, off, length)
}
// FileNotifyStoreCache notifies the kernel about changed data of the inode.
//
// This call is similar to FileNotify, but instead of only invalidating a data
// region, it puts updated data directly to the kernel cache:
//
// After this call completes, the kernel has put updated data into the inode's cache,
// and will use data from that cache for non direct-IO reads from the inode
// in corresponding data region. After kernel's cache data is evicted, the kernel
// will have to issue new Read calls on user request to get data content.
func (c *FileSystemConnector) FileNotifyStoreCache(node *Inode, off int64, data []byte) fuse.Status {
var nId uint64
if node == c.rootNode {
nId = fuse.FUSE_ROOT_ID
} else {
nId = c.inodeMap.Handle(&node.handled)
}
if nId == 0 {
return fuse.EINVAL
}
return c.server.InodeNotifyStoreCache(nId, off, data)
}
// EntryNotify makes the kernel forget the entry data from the given
// name from a directory. After this call, the kernel will issue a
// new lookup request for the given name when necessary. No filesystem
......
......@@ -60,9 +60,10 @@ const (
// The following entries don't have to be compatible across Go-FUSE versions.
_OP_NOTIFY_ENTRY = int32(100)
_OP_NOTIFY_INODE = int32(101)
_OP_NOTIFY_DELETE = int32(102) // protocol version 18
_OP_NOTIFY_STORE = int32(102)
_OP_NOTIFY_DELETE = int32(103) // protocol version 18
_OPCODE_COUNT = int32(103)
_OPCODE_COUNT = int32(104)
)
////////////////////////////////////////////////////////////////
......@@ -510,6 +511,7 @@ func init() {
_OP_POLL: unsafe.Sizeof(_PollOut{}),
_OP_NOTIFY_ENTRY: unsafe.Sizeof(NotifyInvalEntryOut{}),
_OP_NOTIFY_INODE: unsafe.Sizeof(NotifyInvalInodeOut{}),
_OP_NOTIFY_STORE: unsafe.Sizeof(NotifyStoreOut{}),
_OP_NOTIFY_DELETE: unsafe.Sizeof(NotifyInvalDeleteOut{}),
} {
operationHandlers[op].OutputSize = sz
......@@ -557,6 +559,7 @@ func init() {
_OP_POLL: "POLL",
_OP_NOTIFY_ENTRY: "NOTIFY_ENTRY",
_OP_NOTIFY_INODE: "NOTIFY_INODE",
_OP_NOTIFY_STORE: "NOTIFY_STORE",
_OP_NOTIFY_DELETE: "NOTIFY_DELETE",
_OP_FALLOCATE: "FALLOCATE",
_OP_READDIRPLUS: "READDIRPLUS",
......@@ -620,6 +623,7 @@ func init() {
_OP_MKDIR: func(ptr unsafe.Pointer) interface{} { return (*EntryOut)(ptr) },
_OP_NOTIFY_ENTRY: func(ptr unsafe.Pointer) interface{} { return (*NotifyInvalEntryOut)(ptr) },
_OP_NOTIFY_INODE: func(ptr unsafe.Pointer) interface{} { return (*NotifyInvalInodeOut)(ptr) },
_OP_NOTIFY_STORE: func(ptr unsafe.Pointer) interface{} { return (*NotifyStoreOut)(ptr) },
_OP_NOTIFY_DELETE: func(ptr unsafe.Pointer) interface{} { return (*NotifyInvalDeleteOut)(ptr) },
_OP_STATFS: func(ptr unsafe.Pointer) interface{} { return (*StatfsOut)(ptr) },
_OP_SYMLINK: func(ptr unsafe.Pointer) interface{} { return (*EntryOut)(ptr) },
......
......@@ -232,6 +232,10 @@ func (o *NotifyInvalDeleteOut) string() string {
return fmt.Sprintf("{parent %d ch %d sz %d}", o.Parent, o.Child, o.NameLen)
}
func (o *NotifyStoreOut) string() string {
return fmt.Sprintf("{nodeid %d off %d sz %d}", o.Nodeid, o.Offset, o.Size)
}
func (f *FallocateIn) string() string {
return fmt.Sprintf("{Fh %d off %d sz %d mod 0%o}",
f.Fh, f.Offset, f.Length, f.Mode)
......
......@@ -7,6 +7,7 @@ package fuse
import (
"fmt"
"log"
"math"
"os"
"path/filepath"
"runtime"
......@@ -475,6 +476,63 @@ func (ms *Server) InodeNotify(node uint64, off int64, length int64) Status {
return result
}
// InodeNotifyStoreCache tells kernel to store data into inode's cache.
//
// This call is similar to InodeNotify, but instead of only invalidating a data
// region, it gives updated data directly to the kernel.
func (ms *Server) InodeNotifyStoreCache(node uint64, offset int64, data []byte) Status {
if !ms.kernelSettings.SupportsNotify(NOTIFY_STORE) {
return ENOSYS
}
for len(data) > 0 {
size := len(data)
if size > math.MaxUint32 {
// NotifyStoreOut has only uint32 for size
size = math.MaxUint32
}
st := ms.inodeNotifyStoreCache32(node, offset, data[:size])
if st != OK {
return st
}
data = data[size:]
offset += int64(size)
}
return OK
}
// inodeNotifyStoreCache32 is internal worker for InodeNotifyStoreCache which
// handles data chunks not larger than 4GB.
func (ms *Server) inodeNotifyStoreCache32(node uint64, offset int64, data []byte) Status {
req := request{
inHeader: &InHeader{
Opcode: _OP_NOTIFY_STORE,
},
handler: operationHandlers[_OP_NOTIFY_STORE],
status: NOTIFY_STORE,
}
store := (*NotifyStoreOut)(req.outData())
store.Nodeid = node
store.Offset = uint64(offset) // NOTE not int64, as it is e.g. in NotifyInvalInodeOut
store.Size = uint32(len(data))
req.flatData = data
// Protect against concurrent close.
ms.writeMu.Lock()
result := ms.write(&req)
ms.writeMu.Unlock()
if ms.opts.Debug {
log.Printf("Response: INODE_NOTIFY_STORE_CACHE: %v", result)
}
return result
}
// DeleteNotify notifies the kernel that an entry is removed from a
// directory. In many cases, this is equivalent to EntryNotify,
// except when the directory is in use, eg. as working directory of
......@@ -566,6 +624,8 @@ func (in *InitIn) SupportsNotify(notifyType int) bool {
return in.SupportsVersion(7, 12)
case NOTIFY_INVAL_INODE:
return in.SupportsVersion(7, 12)
case NOTIFY_STORE:
return in.SupportsVersion(7, 15)
case NOTIFY_INVAL_DELETE:
return in.SupportsVersion(7, 18)
}
......
// Copyright 2018 the Go-FUSE Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package test
// exercise functionality to store/retrieve kernel cache.
import (
"os"
"io/ioutil"
"testing"
"golang.org/x/sys/unix"
"github.com/hanwen/go-fuse/fuse"
"github.com/hanwen/go-fuse/fuse/nodefs"
"github.com/hanwen/go-fuse/internal/testutil"
)
// DataNode is a nodefs.Node that Reads static data.
//
// Since Read works without Open, the kernel does not invalidate DataNode cache
// on user's open.
type DataNode struct {
nodefs.Node
data []byte
}
func NewDataNode(data []byte) *DataNode {
return &DataNode{Node: nodefs.NewDefaultNode(), data: data}
}
func (d *DataNode) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
out.Size = uint64(len(d.data))
out.Mode = fuse.S_IFREG | 0644
return fuse.OK
}
func (d *DataNode) Read(_ nodefs.File, dest []byte, off int64, _ *fuse.Context) (fuse.ReadResult, fuse.Status) {
l := int64(len(d.data))
end := off + l
if end > l {
end = l
}
return fuse.ReadResultData(d.data[off:end]), fuse.OK
}
// TestCacheControl verifies that FUSE server process can store/retrieve kernel data cache.
func TestCacheControl(t *testing.T) {
dir := testutil.TempDir()
defer func() {
err := os.Remove(dir)
if err != nil {
t.Fatal(err)
}
}()
// setup a filesystem with 1 file
root := nodefs.NewDefaultNode()
opts := nodefs.NewOptions()
opts.Debug = testutil.VerboseTest()
srv, fsconn, err := nodefs.MountRoot(dir, root, opts)
if err != nil {
t.Fatal(err)
}
data0 := "hello world"
file := NewDataNode([]byte(data0))
root.Inode().NewChild("hello.txt", false, file)
go srv.Serve()
if err := srv.WaitMount(); err != nil {
t.Fatal("WaitMount", err)
}
defer func() {
err := srv.Unmount()
if err != nil {
t.Fatal(err)
}
}()
// assertFileRead asserts that the file content reads as dataOK.
assertFileRead := func(subj, dataOK string) {
t.Helper()
v, err := ioutil.ReadFile(dir + "/hello.txt")
if err != nil {
t.Fatalf("%s: file read: %s", subj, err)
}
if string(v) != dataOK {
t.Fatalf("%s: file read: got %q ; want %q", subj, v, dataOK)
}
}
// make sure the file reads correctly
assertFileRead("original", data0)
// pin file content into OS cache
f, err := os.Open(dir + "/hello.txt")
if err != nil {
t.Fatal(err)
}
fmmap, err := unix.Mmap(int(f.Fd()), 0, len(data0), unix.PROT_READ, unix.MAP_SHARED)
if err != nil {
t.Fatal(err)
}
err = f.Close()
if err != nil {
t.Fatal(err)
}
defer func() {
err := unix.Munmap(fmmap)
if err != nil {
t.Fatal(err)
}
}()
err = unix.Mlock(fmmap)
if err != nil {
t.Fatal(err)
}
// assertMmapRead asserts that file's mmaped memory reads as dataOK.
assertMmapRead := func(subj, dataOK string) {
t.Helper()
if string(fmmap) != dataOK {
t.Fatalf("%s: file mmap: got %q ; want %q", subj, fmmap, dataOK)
}
}
// make sure the cache has original data
assertMmapRead("original", data0)
// store changed data into OS cache
st := fsconn.FileNotifyStoreCache(file.Inode(), 7, []byte("123"))
if st != fuse.OK {
t.Fatalf("store cache: %s", st)
}
// make sure mmaped data and file read as updated data
data1 := "hello w123d"
assertMmapRead("after storecache", data1)
assertFileRead("after storecache", data1)
// TODO verify retrieve cache
// invalidate cache
st = fsconn.FileNotify(file.Inode(), 0, 0)
if st != fuse.OK {
t.Fatalf("invalidate cache: %s", st)
}
// make sure mmapped data and file read as original data
assertMmapRead("after invalcache", data0)
assertFileRead("after invalcache", data0)
}
......@@ -376,11 +376,18 @@ type NotifyInvalDeleteOut struct {
Padding uint32
}
type NotifyStoreOut struct {
Nodeid uint64
Offset uint64
Size uint32
Padding uint32
}
const (
// NOTIFY_POLL = -1
NOTIFY_INVAL_INODE = -2
NOTIFY_INVAL_ENTRY = -3
// NOTIFY_STORE = -4
NOTIFY_STORE = -4
// NOTIFY_RETRIEVE = -5
NOTIFY_INVAL_DELETE = -6
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment