Commit b02dcadc authored by Levin Zimmermann's avatar Levin Zimmermann

wcfs: Fix protection against faulty client

The WCFS documentation specifies [1]:

- - - 8> - - - 8> - - -

If a client, on purpose or due to a bug or being stopped, is slow to respond
with ack to file invalidation notification, it creates a problem because the
server will become blocked waiting for pin acknowledgments, and thus all
other clients, that try to work with the same file, will get stuck.

[...]

Lacking OS primitives to change address space of another process and not
being able to work it around with ptrace in userspace, wcfs takes approach
to kill a slow client on 30 seconds timeout by default.

- - - <8 - - - <8 - - -

But before this patch, this protection wasn't implemented yet: one
faulty client could therefore freeze the whole system. With this patch
this protection is implemented now: faulty clients are killed after the
timeout.

[1] https://lab.nexedi.com/nexedi/wendelin.core/blob/38dde766/wcfs/wcfs.go#L186-208
parent aef0f0e1
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"strings" "strings"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time"
log "github.com/golang/glog" log "github.com/golang/glog"
...@@ -515,3 +516,22 @@ func (root *Root) StatFs() *fuse.StatfsOut { ...@@ -515,3 +516,22 @@ func (root *Root) StatFs() *fuse.StatfsOut {
func panicf(format string, argv ...interface{}) { func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...)) panic(fmt.Sprintf(format, argv...))
} }
// isProcessAlive returns 'true' if process is still alive after timeout
// and 'false' otherwise
func isProcessAlive(pid int, timeout time.Duration) bool {
for start := time.Now(); time.Since(start) < timeout; {
alive := _isProcessAlive(pid)
if !alive {
return false
}
}
return true
}
// _isProcessAlive returns 'true' if process is currently present
// and 'false' otherwise
func _isProcessAlive(pid int) bool {
killErr := syscall.Kill(pid, syscall.Signal(0))
return killErr == nil
}
...@@ -1487,6 +1487,30 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1487,6 +1487,30 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
if err != nil { if err != nil {
// If timed out, we kill the client:
// SIGBUS => wait for some time; if still alive => SIGKILL
if errors.Is(err, ErrTimedOut) {
pid := int(w.link.caller.Pid)
// TODO kirr: "The kernel then sends SIGBUS on such case with the details about
// access to which address generated this error going in si_addr field of
// siginfo structure. It would be good if we can mimic that behaviour to a
// reasonable extent if possible."
err := syscall.Kill(pid, syscall.Signal(syscall.SIGBUS))
if err != nil {
return err
}
if isProcessAlive(pid, time.Second * 1) {
err = syscall.Kill(pid, syscall.Signal(syscall.SIGKILL))
if err != nil {
return err
}
}
// We don't return an error, because 'readPinWatchers'
// should continue as if nothing would have happened if we
// timed out: the other clients should not be affected by
// one faulty client.
return nil
}
blkpin.err = err blkpin.err = err
return err return err
} }
...@@ -1513,7 +1537,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1513,7 +1537,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// Must be called with f.head.zheadMu rlocked. // Must be called with f.head.zheadMu rlocked.
// //
// XXX do we really need to use/propagate caller context here? ideally update // XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout (TODO). // watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure? -> probably no // Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) { func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) {
defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk
...@@ -1981,6 +2005,8 @@ func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error { ...@@ -1981,6 +2005,8 @@ func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
return err return err
} }
var ErrTimedOut error = errors.New("timed out")
// sendReq sends wcfs-originated request to client and returns client response. // sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) { func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
defer xerr.Context(&err, "sendReq") // wlink is already put into ctx by caller defer xerr.Context(&err, "sendReq") // wlink is already put into ctx by caller
...@@ -2026,6 +2052,9 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, ...@@ -2026,6 +2052,9 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
case reply = <-rxq: case reply = <-rxq:
return reply, nil return reply, nil
case <-time.After(30 * time.Second):
return "", ErrTimedOut
} }
} }
......
...@@ -51,7 +51,6 @@ from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK ...@@ -51,7 +51,6 @@ from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, error, b from golang import go, chan, select, func, defer, error, b
from golang import context, errors, sync, time from golang import context, errors, sync, time
from zodbtools.util import ashex as h, fromhex from zodbtools.util import ashex as h, fromhex
import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail from pytest import raises, fail
from wendelin.wcfs.internal import io, mm from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_exfault_nogil, SegmentationFault, install_sigbus_trap, fadvise_dontneed from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_exfault_nogil, SegmentationFault, install_sigbus_trap, fadvise_dontneed
...@@ -350,7 +349,7 @@ class DFile: ...@@ -350,7 +349,7 @@ class DFile:
# TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ? # TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS): class tWCFS(_tWCFS):
@func @func
def __init__(t): def __init__(t, timeout=10*time.second):
assert not os.path.exists(testmntpt) assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True) wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt assert wc.mountpoint == testmntpt
...@@ -366,7 +365,7 @@ class tWCFS(_tWCFS): ...@@ -366,7 +365,7 @@ class tWCFS(_tWCFS):
# still wait for request completion even after fatal signal ) # still wait for request completion even after fatal signal )
nogilready = chan(dtype='C.structZ') nogilready = chan(dtype='C.structZ')
t._wcfuseabort = os.dup(wc._wcsrv._fuseabort.fileno()) t._wcfuseabort = os.dup(wc._wcsrv._fuseabort.fileno())
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout go(t._abort_ontimeout, t._wcfuseabort, timeout, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil nogilready.recv() # wait till _abort_ontimeout enters nogil
# _abort_ontimeout is in wcfs_test.pyx # _abort_ontimeout is in wcfs_test.pyx
...@@ -404,7 +403,7 @@ class tDB(tWCFS): ...@@ -404,7 +403,7 @@ class tDB(tWCFS):
# create before wcfs startup. old_data is []changeDelta - see .commit # create before wcfs startup. old_data is []changeDelta - see .commit
# and .change for details. # and .change for details.
@func @func
def __init__(t, old_data=[]): def __init__(t, old_data=[], **kwargs):
t.root = testdb.dbopen() t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1] exc = sys.exc_info()[1]
...@@ -434,7 +433,7 @@ class tDB(tWCFS): ...@@ -434,7 +433,7 @@ class tDB(tWCFS):
t._commit(t.zfile, changeDelta) t._commit(t.zfile, changeDelta)
# start wcfs after testdb is created and initial data is committed # start wcfs after testdb is created and initial data is committed
super(tDB, t).__init__() super(tDB, t).__init__(**kwargs)
# fh(.wcfs/zhead) + history of zhead read from there # fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead") t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
...@@ -1453,12 +1452,13 @@ def test_wcfs_watch_going_back(): ...@@ -1453,12 +1452,13 @@ def test_wcfs_watch_going_back():
# verify that wcfs kills slow/faulty client who does not reply to pin in time. # verify that wcfs kills slow/faulty client who does not reply to pin in time.
@xfail # protection against faulty/slow clients
@func @func
def test_wcfs_pintimeout_kill(): def test_wcfs_pintimeout_kill():
# adjusted wcfs timeout to kill client who is stuck not providing pin reply # adjusted wcfs timeout to kill client who is stuck not providing pin reply
tkill = 3*time.second # timeout until killing is 30 seconds, we add 2 seconds delay for the actual
t = tDB(); zf = t.zfile # XXX wcfs args += tkill=<small> # killing/process shutdown.
tkill = 32*time.second
t = tDB(timeout=tkill*2); zf = t.zfile
defer(t.close) defer(t.close)
at1 = t.commit(zf, {2:'c1'}) at1 = t.commit(zf, {2:'c1'})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment