Commit c559ec1a authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Levin Zimmermann

wcfs: Implement protection against faulty client

The WCFS documentation specifies [1]:

- - - 8> - - - 8> - - -

If a client, on purpose or due to a bug or being stopped, is slow to respond
with ack to file invalidation notification, it creates a problem because the
server will become blocked waiting for pin acknowledgments, and thus all
other clients, that try to work with the same file, will get stuck.

[...]

Lacking OS primitives to change address space of another process and not
being able to work it around with ptrace in userspace, wcfs takes approach
to kill a slow client on 30 seconds timeout by default.

- - - <8 - - - <8 - - -

But before this patch, this protection wasn't implemented yet: one
faulty client could therefore freeze the whole system. With this patch
this protection is implemented now: faulty clients are killed after the
timeout or any other misbehaviour in their pin handlers.

[1] https://lab.nexedi.com/nexedi/wendelin.core/blob/38dde766/wcfs/wcfs.go#L186-208

Preliminary history:

    levin.zimmermann/wendelin.core@24904e82
    levin.zimmermann/wendelin.core@b02dcadcCo-authored-by: Levin Zimmermann's avatarLevin Zimmermann <levin.zimmermann@nexedi.com>

/discussed-on nexedi/wendelin.core!18
parent 007d53db
......@@ -8,7 +8,8 @@ require (
github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c
github.com/kisielk/og-rek v1.2.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/shirou/gopsutil/v4 v4.24.8 // indirect
github.com/stretchr/testify v1.9.0
lab.nexedi.com/kirr/go123 v0.0.0-20230822135329-95433de34faf
lab.nexedi.com/kirr/neo/go v0.0.0-20240723085959-839ee634bd66
)
......
......@@ -28,6 +28,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
......@@ -49,6 +51,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gwenn/gosqlite v0.0.0-20211101095637-b18efb2e44c8 h1:sWkgaGez8CNa2KHGBTTop16/mC03VP6MDqPKfvhEmCU=
......@@ -72,6 +76,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
......@@ -79,11 +85,18 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/shamaton/msgpack v1.2.1 h1:40cwW7YAEdOIxcxIsUkAxSMUyYWZUyNiazI5AyiBntI=
github.com/shamaton/msgpack v1.2.1/go.mod h1:ibiaNQRTCUISAYkkyOpaSCEBiCAxXe6u6Mu1sQ6945U=
github.com/shirou/gopsutil/v4 v4.24.8 h1:pVQjIenQkIhqO81mwTaXjTzOMT7d3TZkf43PlVFHENI=
github.com/shirou/gopsutil/v4 v4.24.8/go.mod h1:wE0OrJtj4dG+hYkxqDH3QiBICdKSf04/npcvLLc/oRg=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/someonegg/gocontainer v1.0.0 h1:9MMUFbQf7g+g9sMG4ggBHPDS1+Iz+wd9Ee/O4BNRdw0=
github.com/someonegg/gocontainer v1.0.0/go.mod h1:zGJcXRK0ikzEYPFKTaFXi6UU/ulNuJypfADX4UQGtMw=
......@@ -92,6 +105,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
......@@ -101,10 +115,17 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
......@@ -137,8 +158,10 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
......@@ -147,6 +170,10 @@ golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e h1:zeJt6jBtVDK23XK9QXcmG0FvO
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
......
......@@ -25,17 +25,21 @@ import (
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
log "github.com/golang/glog"
"github.com/shirou/gopsutil/v4/process"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -440,7 +444,7 @@ func fatalEIO() {
log.Fatal("switching filesystem to EIO mode")
}
// ---- parsing ----
// ---- parsing / formatting ----
// parseWatchFrame parses line going through /head/watch into (stream, msg)
//
......@@ -501,6 +505,17 @@ func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) {
return oid, at, nil
}
// isoRevstr returns string form of revision as used in isolation protocol.
//
// It is almost the same as standard string form of ZODB revision except that
// zodb.TidMax is represented as "head".
func isoRevstr(rev zodb.Tid) string {
if rev == zodb.TidMax {
return "head"
}
return rev.String()
}
// ---- make df happy (else it complains "function not supported") ----
func (root *Root) StatFs() *fuse.StatfsOut {
......@@ -527,3 +542,87 @@ func (root *Root) StatFs() *fuse.StatfsOut {
func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...))
}
// findAliveProces lookups process by pid and makes sure it is alive.
//
// NOTE: starting from go1.23 it, via os.FindProcess, uses pidfd which avoids potential
// race of later signalling to pid of already long-gone and replaced process.
func findAliveProcess(pid int) (_ *os.Process, err error) {
defer xerr.Contextf(&err, "findAlive pid%d", pid)
proc, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
// verify that found process is actually good because
// os.FindProcess returns "done" stub instead of an error
alive, err := isProcessAlive(proc)
if err != nil {
return nil, err
}
if !alive {
proc.Release()
return nil, syscall.ESRCH
}
return proc, nil
}
// isProcessAlive returns whether process is alive or not.
func isProcessAlive(proc *os.Process) (_ bool, err error) {
defer xerr.Contextf(&err, "isAlive pid%d", proc.Pid)
// verify that proc's pid exists
// proc.Signal(0) returns ok even for zombie, but zombie is not alive
err = proc.Signal(syscall.Signal(0))
if err != nil {
var e syscall.Errno
if errors.As(err, &e) && e == syscall.EPERM {
return false, err
}
return false, nil
}
// pid exists. Check if proc is not zombie
gproc, err := process.NewProcess(int32(proc.Pid))
if err != nil {
return false, err
}
statusv, err := gproc.Status()
if err != nil {
return false, err
}
for _, status := range statusv {
if status == process.Zombie {
return false, nil
}
}
return true, nil
}
// waitProcessEnd waits for process to end.
//
// Contrary to os.Process.Wait it does not require the caller to be a parent of proc.
func waitProcessEnd(ctx context.Context, proc *os.Process) (_ bool, err error) {
defer xerr.Contextf(&err, "waitEnd pid%d", proc.Pid)
tick := time.NewTicker(100*time.Millisecond)
defer tick.Stop()
for {
alive, err := isProcessAlive(proc)
if err != nil {
return false, err
}
if !alive {
return true, nil
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-tick.C:
// ok
}
}
}
......@@ -205,7 +205,16 @@
//
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
// to kill a slow or faulty client on 30 seconds timeout or on any other pin
// handling error. This way wcfs achieves progress and safety properties:
// processing does not get stuck even if there is a hung client, and there is
// no corruption in the data that is provided to all live and well-behaving
// clients.
//
// Killing a client with SIGBUS is similar to how OS kernel sends SIGBUS when
// a memory-mapped file is accessed and loading file data results in EIO. It is
// also similar to wendelin.core 1 where SIGBUS is raised if loading file block
// results in an error.
//
//
// Writes
......@@ -543,7 +552,7 @@ type Root struct {
revMu sync.Mutex
revTab map[zodb.Tid]*Head
// time budget for a client to handle pin notification (TODO)
// time budget for a client to handle pin notification
pinTimeout time.Duration
// collected statistics
......@@ -687,6 +696,8 @@ type WatchLink struct {
down1 sync.Once
down chan struct{} // ready after shutdown completes
pinWG sync.WaitGroup // all pin handlers are accounted here
client *os.Process // client that opened the WatchLink
}
// Watch represents watching for changes to 1 BigFile over particular watch link.
......@@ -728,6 +739,7 @@ type blkPinState struct {
// The statistics is accessible via .wcfs/stats file served by _wcfs_Stats.
type Stats struct {
pin atomic.Int64 // # of times wcfs issued pin request
pinkill atomic.Int64 // # of times a client was killed due to badly handling pin
}
......@@ -1438,15 +1450,15 @@ func traceIso(format string, argv ...interface{}) {
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
//
// Pinning works under WatchLink.serveCtx instead of explicitly
// specified context because pinning is critical operation whose failure will lead
// Pinning works under WatchLink.serveCtx + pinTimeout instead of explicitly
// specified context because pinning is critical operation whose failure leads
// to client being SIGBUS'ed and so pinning should not be interrupted arbitrarily.
//
// Corresponding watchlink is shutdown on any error.
//
// No error is returned as currently pin handles all errors itself inside, and
// in the future the only error that pin will not be able to handle itself inside
// will be considered to be fatal and the filesystem will be switched to EIO mode on that.
// No error is returned as the only error that pin cannot handle itself inside
// is considered to be fatal and the filesystem is switched to EIO mode on that.
// See badPinKill documentation for details.
//
// pin is invoked by BigFile.readPinWatchers . It is called with atMu rlocked.
func (w *Watch) pin(blk int64, rev zodb.Tid) {
......@@ -1473,13 +1485,30 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) {
}
}
// PinError indicates to WatchLink shutdown that pinning a block failed and so
// badPinKill needs to be run.
type PinError struct {
blk int64
rev zodb.Tid
err error
}
func (e *PinError) Error() string {
return fmt.Sprintf("pin #%d @%s: %s", e.blk, isoRevstr(e.rev), e.err)
}
func (e *PinError) Unwrap() error {
return e.err
}
func (w *Watch) __pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
defer func() {
if err != nil {
err = &PinError{blk, rev, err}
}
}()
foid := w.file.zfile.POid()
revstr := rev.String()
if rev == zodb.TidMax {
revstr = "head"
}
defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
if !(rev == zodb.TidMax || rev <= w.at) {
panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
......@@ -1526,7 +1555,7 @@ func (w *Watch) __pin(ctx context.Context, blk int64, rev zodb.Tid) (err error)
// perform IO without w.pinnedMu
w.pinnedMu.Unlock()
groot.stats.pin.Add(1)
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, isoRevstr(rev)))
w.pinnedMu.Lock()
// check IO reply & verify/signal blkpin is ready
......@@ -1558,6 +1587,92 @@ func (w *Watch) __pin(ctx context.Context, blk int64, rev zodb.Tid) (err error)
return nil
}
// badPinKill is invoked by shutdown to kill client that did not handle pin
// notification correctly and in time.
//
// Because proper pin handling is critical for safety it is considered to be a
// fatal error if the client could not be killed as wcfs no longer can
// continue to provide correct uncorrupted data to it. The filesystem is
// switched to EIO mode in such case.
func (wlink *WatchLink) badPinKill(reason error) {
pid := wlink.client.Pid
logf := func(format string, argv ...any) {
emsg := fmt.Sprintf("pid%d: ", pid)
emsg += fmt.Sprintf(format, argv...)
log.Error(emsg)
}
logf("client failed to handle pin notification correctly and timely in %s: %s", groot.pinTimeout, reason)
logf("-> killing it because else 1) all other clients will remain stuck, and 2) we no longer can provide correct data to the faulty client.")
logf(` (see "Protection against slow or faulty clients" in wcfs description for details)`)
err := wlink._badPinKill()
if err != nil {
logf("failed to kill it: %s", err)
logf("this is major unexpected event.")
fatalEIO()
}
logf("terminated")
groot.stats.pinkill.Add(1)
}
func (wlink *WatchLink) _badPinKill() error {
client := wlink.client
pid := client.Pid
// time budget for pin + wait + fatal-notify + kill = pinTimeout + 1 + 1/3·pinTimeout
// < 2 ·pinTimeout if pinTimeout > 3/2
//
// NOTE wcfs_faultyprot_test.py waits for 2·pinTimeout to reliably
// detect whether client was killed or not.
timeout := groot.pinTimeout/3
ctx := context.Background()
ctx1, cancel := context.WithTimeout(ctx, timeout*1/2)
defer cancel()
ctx2, cancel := context.WithTimeout(ctx, timeout*2/2)
defer cancel()
// SIGBUS => wait for some time; if still alive => SIGKILL
// TODO kirr: "The kernel then sends SIGBUS on such case with the details about
// access to which address generated this error going in si_addr field of
// siginfo structure. It would be good if we can mimic that behaviour to a
// reasonable extent if possible."
log.Errorf("pid%d: <- SIGBUS", pid)
err := client.Signal(syscall.SIGBUS)
if err != nil {
return err
}
ok, err := waitProcessEnd(ctx1, client)
if err != nil && !errors.Is(err, ctx1.Err()) {
return err
}
if ok {
return nil
}
log.Errorf("pid%d: is still alive after SIGBUS", pid)
log.Errorf("pid%d: <- SIGKILL", pid)
err = client.Signal(syscall.SIGKILL)
if err != nil {
return err
}
ok, err = waitProcessEnd(ctx2, client)
if err != nil && !errors.Is(err, ctx2.Err()) {
return err
}
if ok {
return nil
}
err = fmt.Errorf("is still alive after SIGKILL")
log.Errorf("pid%d: %s", pid, err)
return err
}
// readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file
// after a block was loaded from ZODB but before block data is returned to kernel.
//
......@@ -1643,7 +1758,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
// NOTE we do not propagate context to pin. Ideally update
// watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// watchers should be synchronous, and in practice we just use 30s timeout.
// A READ interrupt should not cause watch update failure.
w.pin(blk, pinrev) // only fatal error
return nil
......@@ -1880,9 +1995,23 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// Open serves /head/watch opens.
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
node, err := wnode.open(flags, fctx)
return node, err2LogStatus(err)
}
func (wnode *WatchNode) open(flags uint32, fctx *fuse.Context) (_ nodefs.File, err error) {
defer xerr.Contextf(&err, "/head/watch: open")
// TODO(?) check flags
head := wnode.head
// remember our client who opened the watchlink.
// We will need to kill the client if it will be e.g. slow to respond to pin notifications.
client, err := findAliveProcess(int(fctx.Caller.Pid))
if err != nil {
return nil, err
}
serveCtx, serveCancel := context.WithCancel(context.TODO() /*TODO ctx of wcfs running*/)
wlink := &WatchLink{
sk: NewFileSock(),
......@@ -1893,6 +2022,7 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
serveCtx: serveCtx,
serveCancel: serveCancel,
down: make(chan struct{}),
client: client,
}
head.wlinkMu.Lock()
......@@ -1900,12 +2030,13 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
head.wlinkMu.Unlock()
go wlink.serve(serveCtx)
return wlink.sk.File(), fuse.OK
return wlink.sk.File(), nil
}
// shutdown shuts down communication over watchlink due to specified reason and
// marks the watchlink as no longer active.
//
// The client is killed if the reason is due to "failed to pin".
// Only the first shutdown call has the effect, but all calls wait for the
// actual shutdown to complete.
//
......@@ -1916,10 +2047,23 @@ func (wlink *WatchLink) shutdown(reason error) {
wlink.serveCancel()
// give client a chance to be notified if shutdown was due to some logical error
kill := false
if reason != nil {
_, kill = reason.(*PinError)
emsg := "error: "
if kill {
emsg = "fatal: "
}
emsg += reason.Error()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = wlink.send(ctx, 0, fmt.Sprintf("error: %s", reason))
_ = wlink.send(ctx, 0, emsg)
}
// kill client if shutdown is due to faulty pin handling
if kill {
wlink.badPinKill(reason) // only fatal error
}
// NOTE unregistering watches and wlink itself is done on serve exit, not
......@@ -1969,6 +2113,9 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
if err == nil {
err = err2
}
// release client process
wlink.client.Release()
}()
// watch handlers are spawned in dedicated workgroup
......@@ -1986,7 +2133,7 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
//
// For error return, we want any in-progress, and so will
// become failed, pin handler to result in corresponding client
// to become killed (TODO). That's why we trigger only cancel
// to become killed. That's why we trigger only cancel
// ourselves and let failed pin handlers to invoke shutdown
// with their specific reason.
//
......@@ -2562,6 +2709,7 @@ func _wcfs_Stats(fctx *fuse.Context) ([]byte, error) {
// dump information collected in root.stats
s := root.stats
num("pin", s.pin.Load())
num("pinkill", s.pinkill.Load())
return []byte(stats), nil
}
......
......@@ -30,7 +30,6 @@ import six
from golang import select, func, defer
from golang import context, sync, time
import pytest; xfail = pytest.mark.xfail
from pytest import mark, fixture
from wendelin.wcfs.wcfs_test import tDB, h, tAt, eprint, \
setup_module, teardown_module, setup_function, teardown_function
......@@ -281,7 +280,6 @@ def __bad_watch_pinh(ctx, f, at, pinh, pinhFailReason):
def _bad_watch_no_pin_reply (ctx, f, at): __bad_watch_pinh(ctx, f, at, f._pinner_no_pin_reply, "is stuck")
def _bad_watch_nak_pin_reply(ctx, f, at): __bad_watch_pinh(ctx, f, at, f._pinner_nak_pin_reply, "replies nak")
@xfail # protection against faulty/slow clients
@mark.parametrize('faulty', [
_bad_watch_no_pin_read,
_bad_watch_no_pin_reply,
......@@ -301,6 +299,7 @@ def test_wcfs_pinhfaulty_kill_on_watch(faulty, with_prompt_pintimeout):
# launch faulty process that should be killed by wcfs on problematic pin during watch setup
p = tFaultySubProcess(t, faulty, at=at1)
defer(p.close)
t.assertStats({'pinkill': 0})
# wait till faulty client issues its watch, receives pin and pauses/misbehaves
p.send("start watch")
......@@ -314,6 +313,7 @@ def test_wcfs_pinhfaulty_kill_on_watch(faulty, with_prompt_pintimeout):
# the faulty client must become killed by wcfs
p.join(t.ctx)
assert p.exitcode is not None
t.assertStats({'pinkill': 1})
# verify that wcfs kills slow/faulty client who does not handle pin
......@@ -371,7 +371,6 @@ def __bad_pinh(ctx, f, at, pinh):
def _bad_pinh_no_pin_reply (ctx, f, at): __bad_pinh(ctx, f, at, f._pinner_no_pin_reply)
def _bad_pinh_nak_pin_reply(ctx, f, at): __bad_pinh(ctx, f, at, f._pinner_nak_pin_reply)
@xfail # protection against faulty/slow clients
@mark.parametrize('faulty', [
_bad_pinh_no_pin_read,
_bad_pinh_no_pin_reply,
......@@ -396,6 +395,7 @@ def test_wcfs_pinhfaulty_kill_on_access(faulty, with_prompt_pintimeout):
p = tFaultySubProcess(t, faulty, at=at2)
defer(p.close)
assert p.recv(t.ctx) == "f: watch setup ok"
t.assertStats({'pinkill': 0})
# commit new transaction and issue read access to modified block
# our read should be served well even though faulty client is either stuck
......@@ -414,6 +414,7 @@ def test_wcfs_pinhfaulty_kill_on_access(faulty, with_prompt_pintimeout):
p.join(t.ctx)
assert p.exitcode is not None
t.assertStats({'pinkill': 1})
# _pinner_<problem> simulates faulty pinner inside client that behaves in
......
......@@ -387,7 +387,7 @@ class tWCFS(_tWCFS):
t._stats_prev = None
t.assertStats({'BigFile': 0, 'RevHead': 0, 'ZHeadLink': 0,
'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0,
'pin': 0})
'pin': 0, 'pinkill': 0})
# _abort_ontimeout is in wcfs_test.pyx
......@@ -424,8 +424,8 @@ class tWCFS(_tWCFS):
#
# The state is asserted eventually instead of immediately - for both
# counters and instance values - because wcfs increments a counter
# _after_ corresponding event happened,
# and the tests can start to observe that state
# _after_ corresponding event happened, for example pinkill after actually
# killing client process, and the tests can start to observe that state
# before wcfs actually does counter increment. For the similar reason we
# need to assert that the counters stay in expected state to make sure that
# no extra event happened. For instance values we need to assert
......@@ -559,7 +559,10 @@ class tDB(tWCFS):
assert len(t._wlinks) == 0
t._wc_zheadfh.close()
t.assertStats({'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0, 'ZHeadLink': 0})
zstats = {'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0, 'ZHeadLink': 0}
if not t.multiproc:
zstats['pinkill'] = 0
t.assertStats(zstats)
# open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment