Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wendelin.core
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
Kirill Smelkov
wendelin.core
Commits
6fdd5467
Commit
6fdd5467
authored
Feb 18, 2019
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
f2f6352a
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
216 additions
and
71 deletions
+216
-71
wcfs/notes.txt
wcfs/notes.txt
+36
-2
wcfs/wcfs.go
wcfs/wcfs.go
+180
-69
No files found.
wcfs/notes.txt
View file @
6fdd5467
...
@@ -44,8 +44,8 @@ somewhat acceptable (~ 0.01% of whole-file data size, i.e. ~ 128MB of index for
...
@@ -44,8 +44,8 @@ somewhat acceptable (~ 0.01% of whole-file data size, i.e. ~ 128MB of index for
~ 1TB of data), it is not good from time overhead point of view - initial open
~ 1TB of data), it is not good from time overhead point of view - initial open
of a file this way would be potentially slow.
of a file this way would be potentially slow.
-> we took the approach where we
invalidate a block lazily only when it is
-> we took the approach where we
send invalidation to client about a block
actually accesses
.
lazily only when the block is actually accessed
.
Changing mmapping while under pagefault is possible
Changing mmapping while under pagefault is possible
...
@@ -142,6 +142,40 @@ waiting for read operation to finish for ptrace, and read will be first
...
@@ -142,6 +142,40 @@ waiting for read operation to finish for ptrace, and read will be first
waiting on ptrace stopping to complete = deadlock)
waiting on ptrace stopping to complete = deadlock)
Kernel locks page on read/cache store/... - we have to be careful not to deadlock
=================================================================================
The kernel, when doing FUSE operations, locks corresponding pages. For example
it locks a page, where it is going to read data, before issuing FUSE read
request. Correspondingly, on e.g. cache store, the kernel also locks page where
data has to be stored.
It is easy to deadlock, if we don't take this locks into account. For example
if we try to upload data to kernel pagecache from under serving read request,
this can deadlock.
Another case that needs to be cared about is interaction between uploadBlk and
zwatcher: zconnMu being RWMutex, does not allow new RLocks to be taken once
Lock request has been issued. Thus the following scenario is possible::
uploadBlk os.Read zwatcher
page.Lock
zconnMu.Rlock
zconnMu.Lock
page.Lock
zconnMu.Rlock
- zwatcher is waiting for uploadBlk to release zconnMu;
- uploadBlk is waiting for os.Read to release page;
- os.Read is waiting for zwatcher to release zconnMu;
- deadlock.
To avoid such deadlocks zwatcher asks OS cache uploaders to pause while it is
running, and retries taking zconnMu.Lock until all uploaders are indeed paused.
δ(BTree) notes (XXX -> btreediff package)
δ(BTree) notes (XXX -> btreediff package)
=========================================
=========================================
...
...
wcfs/wcfs.go
View file @
6fdd5467
...
@@ -59,8 +59,8 @@
...
@@ -59,8 +59,8 @@
// at ; data inside head/ is as of this ZODB transaction
// at ; data inside head/ is as of this ZODB transaction
// watch ; channel for bigfile invalidations
// watch ; channel for bigfile invalidations
// bigfile/ ; bigfiles' data
// bigfile/ ; bigfiles' data
// <oid(
bigf
ile1)>
// <oid(
ZBigF
ile1)>
// <oid(
bigf
ile2)>
// <oid(
ZBigF
ile2)>
// ...
// ...
//
//
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
...
@@ -75,8 +75,8 @@
...
@@ -75,8 +75,8 @@
// @<revX>/
// @<revX>/
// at
// at
// bigfile/ ; bigfiles' data as of revision <revX>
// bigfile/ ; bigfiles' data as of revision <revX>
// <oid(
bigf
ile1)>
// <oid(
ZBigF
ile1)>
// <oid(
bigf
ile2)>
// <oid(
ZBigF
ile2)>
// ...
// ...
//
//
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
...
@@ -138,9 +138,9 @@
...
@@ -138,9 +138,9 @@
//
//
// The server sends pin notifications only for file blocks, that are known to
// The server sends pin notifications only for file blocks, that are known to
// be potentially changed after client's <at>, and <rev_max> describes the
// be potentially changed after client's <at>, and <rev_max> describes the
// upper bound for the block revision:
// upper bound for the block revision
as of <at> database view
:
//
//
// <
at> < <rev_max> FIXME -> <rev_max> ≤ <at> (?)
// <
rev_max> ≤ <at>
//
//
// The server maintains short history tail of file changes to be able to
// The server maintains short history tail of file changes to be able to
// support openings with <at> being slightly in the past compared to current
// support openings with <at> being slightly in the past compared to current
...
@@ -244,7 +244,7 @@ package main
...
@@ -244,7 +244,7 @@ package main
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
// 3) for head/bigfile/* the following invariant is maintained:
//
//
// #blk ∈ file cache => ZBlk(#blk) + all BTree/Bucket that lead to it ∈ zhead cache
// #blk ∈
OS
file cache => ZBlk(#blk) + all BTree/Bucket that lead to it ∈ zhead cache
// (ZBlk* in ghost state(%))
// (ZBlk* in ghost state(%))
//
//
// The invariant helps on invalidation: if we see a changed oid, and
// The invariant helps on invalidation: if we see a changed oid, and
...
@@ -387,6 +387,7 @@ import (
...
@@ -387,6 +387,7 @@ import (
"runtime"
"runtime"
"strings"
"strings"
"sync"
"sync"
"sync/atomic"
"syscall"
"syscall"
log
"github.com/golang/glog"
log
"github.com/golang/glog"
...
@@ -415,7 +416,7 @@ type Root struct {
...
@@ -415,7 +416,7 @@ type Root struct {
zstor
zodb
.
IStorage
zstor
zodb
.
IStorage
// ZODB DB handle for zstor.
// ZODB DB handle for zstor.
// keeps cache of connections for @<rev>/ accesse.
// keeps cache of connections for @<rev>/ accesse
s
.
// only one connection is used for each @<rev>.
// only one connection is used for each @<rev>.
zdb
*
zodb
.
DB
zdb
*
zodb
.
DB
...
@@ -438,9 +439,27 @@ type Head struct {
...
@@ -438,9 +439,27 @@ type Head struct {
// watch - implicitly linked to by fs
// watch - implicitly linked to by fs
// ZODB connection for everything under this head
// ZODB connection for everything under this head
zconnMu
sync
.
RWMutex
// protects access to zconn & live _objects_ associated with it
// protects access to zconn & live _objects_ associated with it.
// while it is rlocked zconn is guaranteed to stay viewing database at
// particular view.
//
// zwatcher write-locks this and knows noone is using ZODB objects and
// noone mutates OS file cache while zwatcher is running.
//
// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
// with additional locking protocol to avoid deadlocks (see below for
// pauseOSCacheUpload + ...).
zconnMu
sync
.
RWMutex
zconn
*
ZConn
// for head/ zwatcher resyncs head.zconn; others only read zconn objects.
zconn
*
ZConn
// for head/ zwatcher resyncs head.zconn; others only read zconn objects.
// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
pauseOSCacheUpload
bool
continueOSCacheUpload
chan
struct
{}
// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
inflightOSCacheUploads
int32
// XXX move zconn's current transaction to Head here?
// XXX move zconn's current transaction to Head here?
}
}
...
@@ -478,14 +497,17 @@ type BigFile struct {
...
@@ -478,14 +497,17 @@ type BigFile struct {
// ZBigFile top-level object. Kept activated during lifetime of current transaction.
// ZBigFile top-level object. Kept activated during lifetime of current transaction.
zbf
*
ZBigFile
zbf
*
ZBigFile
// zbf.Size(). It is constant during liftime of current transaction.
// zbf.Size(). It is constant during lif
e
time of current transaction.
zbfSize
int64
zbfSize
int64
// tail change history of this file.
// tail change history of this file.
δFtail
*
ΔTailI64
// [](rev↑, []#blk)
δFtail
*
ΔTailI64
// [](rev↑, []#blk)
// inflight loadings of ZBigFile from ZODB.
// inflight loadings of ZBigFile from ZODB.
// successfull load results are kept here until blkdata is put into OS pagecache.
// successful load results are kept here until blkdata is put into OS pagecache.
//
// Being a staging area for data to enter OS cache, loading has to be
// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
loadMu
sync
.
Mutex
loadMu
sync
.
Mutex
loading
map
[
int64
]
*
blkLoadState
// #blk -> {... blkdata}
loading
map
[
int64
]
*
blkLoadState
// #blk -> {... blkdata}
...
@@ -508,7 +530,7 @@ type blkLoadState struct {
...
@@ -508,7 +530,7 @@ type blkLoadState struct {
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
// becuse it is essentially the index where to find ZBigFile data.
// bec
a
use it is essentially the index where to find ZBigFile data.
//
//
// For the data itself - we put it to kernel pagecache and always deactivate
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
// from ZODB right after that.
...
@@ -531,14 +553,14 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
...
@@ -531,14 +553,14 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
// pointer to LOBTree object and, consequently, that LOBTree object,
// pointer to LOBTree object and, consequently, that LOBTree object,
// even if it was marked not to be released from cache will be GC'ed by
// even if it was marked not to be released from cache will be GC'ed by
// go runtime, and the cache will loose its weak reference to it.
// go runtime, and the cache will loose its weak reference to it.
// XXX however we cannot protect ZBigFile from releas
e
ing state - as
// XXX however we cannot protect ZBigFile from releasing state - as
// any object can be explicitly invalidated.
// any object can be explicitly invalidated.
// FIXME -> teach zodb.LiveCache to keep object by itself?
// FIXME -> teach zodb.LiveCache to keep object by itself?
//
//
// we also keep ZBigFile alive because we want to make sure .blksize
// we also keep ZBigFile alive because we want to make sure .blksize
// and (p. ref) .blktab do not change.
// and (p. ref) .blktab do not change.
//
//
// XXX on every resync
e
we deactivate/activate all bigfiles and restat them
// XXX on every resync we deactivate/activate all bigfiles and restat them
// -> for efficiency better keep ZBigFile in live cache.
// -> for efficiency better keep ZBigFile in live cache.
//case *ZBigFile:
//case *ZBigFile:
}
}
...
@@ -594,18 +616,44 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
...
@@ -594,18 +616,44 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
// zδhandle1 handles 1 change event from ZODB notification.
// zδhandle1 handles 1 change event from ZODB notification.
func
(
root
*
Root
)
zδhandle1
(
zevent
zodb
.
CommitEvent
)
{
func
(
root
*
Root
)
zδhandle1
(
zevent
zodb
.
CommitEvent
)
{
head
:=
root
.
head
// while we are invalidating OS cache, make sure that nothing, that
// while we are invalidating OS cache, make sure that nothing, that
// even reads /head/bigfile/*, is running (see 4.6).
// even reads /head/bigfile/*, is running (see 4.6).
root
.
head
.
zconnMu
.
Lock
()
//
defer
root
.
head
.
zconnMu
.
Unlock
()
// also make sure that cache uploaders we spawned (uploadBlk) are all
// paused, or else they could overwrite OS cache with stale data.
// see notes.txt -> "Kernel locks page on read/cache store/..." for
// details on how to do this without deadlocks.
continueOSCacheUpload
:=
make
(
chan
struct
{})
retry
:
for
{
head
.
zconnMu
.
Lock
()
head
.
pauseOSCacheUpload
=
true
head
.
continueOSCacheUpload
=
continueOSCacheUpload
if
head
.
inflightOSCacheUploads
!=
0
{
head
.
zconnMu
.
Unlock
()
continue
retry
}
break
}
defer
func
()
{
head
.
pauseOSCacheUpload
=
false
head
.
continueOSCacheUpload
=
nil
head
.
zconnMu
.
Unlock
()
close
(
continueOSCacheUpload
)
}()
zhead
:=
root
.
head
.
zconn
zhead
:=
head
.
zconn
bfdir
:=
root
.
head
.
bfdir
bfdir
:=
head
.
bfdir
// fileInvalidate describes invalidations for one file
// fileInvalidate describes invalidations for one file
type
fileInvalidate
struct
{
type
fileInvalidate
struct
{
blkmap
SetI64
// changed blocks
blkmap
SetI64
// changed blocks
size
bool
// whether to invalidate file si
s
e
size
bool
// whether to invalidate file si
z
e
}
}
toinvalidate
:=
map
[
*
BigFile
]
*
fileInvalidate
{}
// {} file -> set(#blk), sizeChanged
toinvalidate
:=
map
[
*
BigFile
]
*
fileInvalidate
{}
// {} file -> set(#blk), sizeChanged
btreeChangev
:=
[]
zodb
.
Oid
{}
// oids changing BTree|Bucket
btreeChangev
:=
[]
zodb
.
Oid
{}
// oids changing BTree|Bucket
...
@@ -705,22 +753,9 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
...
@@ -705,22 +753,9 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
}
}
// invalidate kernel cache for attributes
// invalidate kernel cache for attributes
// we need to do it only if we see topol
i
gy (i.e. btree) change
// we need to do it only if we see topol
o
gy (i.e. btree) change
//
//
// do it after completing data invalidations, else the kernel might get
// do it after completing data invalidations.
// stuck while we try to retrieve cache in invalidateBlk.
//
// XXX recheck ^^^ - we were stuck this way:
// wcfs: 19:42:23.335790 tx 0: NOTIFY_INVAL_INODE, {i8 [-1 +-1)}
// wcfs: 19:42:23.335800 Response: INODE_NOTIFY OK
// wcfs: 19:42:23.335817 tx 0: NOTIFY_RETRIEVE_CACHE, {> 0: i8 [2097152 +2097152)}
// wcfs: 19:42:23.335866 Response: NOTIFY_RETRIEVE_CACHE: OK
// ---- stuck here without kernel response ----
// wcfs: 19:42:28.588168 rx 58: INTERRUPT i0
// wcfs: 19:42:28.588232 Unimplemented opcode INTERRUPT
// wcfs: 19:42:28.588288 tx 58: 38=function not implemented
//
// XXX invalidateBlk gets stuck even without invalidateAttr.
wg
,
ctx
=
errgroup
.
WithContext
(
context
.
TODO
())
wg
,
ctx
=
errgroup
.
WithContext
(
context
.
TODO
())
for
file
,
finv
:=
range
toinvalidate
{
for
file
,
finv
:=
range
toinvalidate
{
if
!
finv
.
size
{
if
!
finv
.
size
{
...
@@ -739,7 +774,7 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
...
@@ -739,7 +774,7 @@ func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
// XXX -> Head.Resync() ?
// XXX -> Head.Resync() ?
// 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn)
// 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn)
// XXX dir.fileMu locking (not needed bcause zconnMu locked)
// XXX dir.fileMu locking (not needed b
e
cause zconnMu locked)
for
file
:=
range
toinvalidate
{
for
file
:=
range
toinvalidate
{
file
.
zbf
.
PDeactivate
()
file
.
zbf
.
PDeactivate
()
file
.
zbfSize
=
-
1
file
.
zbfSize
=
-
1
...
@@ -790,35 +825,58 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
...
@@ -790,35 +825,58 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
blksize
:=
f
.
zbf
.
blksize
blksize
:=
f
.
zbf
.
blksize
off
:=
blk
*
blksize
off
:=
blk
*
blksize
// try to retrieve cache of current head/data[blk]
var
blkdata
[]
byte
=
nil
// first try to retrieve f.loading[blk];
// make sure f.loading[blk] is invalidated.
//
//
// we are running with zconnMu wlocked - no need to lock f.loadMu
loading
,
ok
:=
f
.
loading
[
blk
]
if
ok
{
if
loading
.
err
==
nil
{
blkdata
=
loading
.
blkdata
}
delete
(
f
.
loading
,
blk
)
}
// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
if
blkdata
==
nil
{
blkdata
=
make
([]
byte
,
blksize
)
n
,
st
:=
fsconn
.
FileRetrieveCache
(
f
.
Inode
(),
off
,
blkdata
)
if
st
!=
fuse
.
OK
{
// XXX warn?
}
blkdata
=
blkdata
[
:
n
]
}
// if less than blksize was cached - probably the kernel had to evict
// if less than blksize was cached - probably the kernel had to evict
// some data from its cache already. In such case we don't try to
// some data from its cache already. In such case we don't try to
// preserve the rest and drop what was read, to avoid keeping the
// preserve the rest and drop what was read, to avoid keeping the
// system overloaded.
// system overloaded.
//
if
int64
(
len
(
blkdata
))
==
blksize
{
// XXX st != OK -> warn?
// XXX -> go?
blkdata
:=
make
([]
byte
,
blksize
)
n
,
st
:=
fsconn
.
FileRetrieveCache
(
f
.
Inode
(),
off
,
blkdata
)
if
int64
(
n
)
==
blksize
{
// XXX -> go
// store retrieved data back to OS cache for file @<rev>/file[blk]
// store retrieved data back to OS cache for file @<rev>/file[blk]
blkrev
,
_
:=
f
.
δFtail
.
LastRevOf
(
blk
,
f
.
head
.
zconn
.
At
())
blkrev
,
_
:=
f
.
δFtail
.
LastRevOf
(
blk
,
f
.
head
.
zconn
.
At
())
frev
,
err
:=
groot
.
mkrevfile
(
blkrev
,
f
.
zbf
.
POid
())
frev
,
err
:=
groot
.
mkrevfile
(
blkrev
,
f
.
zbf
.
POid
())
if
err
!=
nil
{
if
err
!=
nil
{
// XXX
// XXX
panic
(
err
)
}
}
st
=
fsconn
.
FileNotifyStoreCache
(
frev
.
Inode
(),
off
,
blkdata
)
st
:
=
fsconn
.
FileNotifyStoreCache
(
frev
.
Inode
(),
off
,
blkdata
)
if
st
!=
fuse
.
OK
{
if
st
!=
fuse
.
OK
{
// XXX log - dup wrt readBlk -> common func.
// XXX log - dup wrt readBlk -> common func. XXX -> uploadBlk
panic
(
st
)
}
}
}
}
// invalidate file/head/data[blk] in OS file cache.
// invalidate file/head/data[blk] in OS file cache.
st
=
fsconn
.
FileNotify
(
f
.
Inode
(),
off
,
blksize
)
st
:=
fsconn
.
FileNotify
(
f
.
Inode
(),
off
,
blksize
)
// XXX st != ok (fatal here)
if
st
!=
fuse
.
OK
{
// XXX (fatal error here) -> return just error
panic
(
st
)
}
panic
(
"TODO"
)
panic
(
"TODO"
)
// XXX -> return nil
}
}
// invalidateAttr invalidates file attributes in kernel cache.
// invalidateAttr invalidates file attributes in kernel cache.
...
@@ -837,7 +895,7 @@ func (f *BigFile) invalidateAttr() (err error) {
...
@@ -837,7 +895,7 @@ func (f *BigFile) invalidateAttr() (err error) {
//
//
// We need node ID to be know to the kernel, when we need to store data into
// We need node ID to be know to the kernel, when we need to store data into
// file's kernel cache - if the kernel don't have the node ID for the file in
// file's kernel cache - if the kernel don't have the node ID for the file in
// question, FileNotifyStoreCche will just fail.
// question, FileNotifyStoreC
a
che will just fail.
//
//
// For kernel to know the inode mkrevfile issues regular filesystem lookup
// For kernel to know the inode mkrevfile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// request which goes to kernel and should go back to wcfs. It is thus not safe
...
@@ -924,7 +982,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
...
@@ -924,7 +982,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
}
}
// relock bfdir and either register f or, if the file was maybe
// relock bfdir and either register f or, if the file was maybe
// simultanously created while we were not holding bfdir.fileMu, return that.
// simultan
e
ously created while we were not holding bfdir.fileMu, return that.
bfdir
.
fileMu
.
Lock
()
bfdir
.
fileMu
.
Lock
()
f2
,
already
:=
bfdir
.
fileTab
[
oid
]
f2
,
already
:=
bfdir
.
fileTab
[
oid
]
if
already
{
if
already
{
...
@@ -987,7 +1045,7 @@ func (root *Root) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err e
...
@@ -987,7 +1045,7 @@ func (root *Root) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err e
}
}
// relock root and either mkdir or EEXIST if the directory was maybe
// relock root and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding revMu.
// simultan
e
ously created while we were not holding revMu.
root
.
revMu
.
Lock
()
root
.
revMu
.
Lock
()
_
,
already
=
root
.
revTab
[
rev
]
_
,
already
=
root
.
revTab
[
rev
]
if
already
{
if
already
{
...
@@ -1210,7 +1268,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
...
@@ -1210,7 +1268,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
}
}
}
}
// noone was loading - we became reponsible to load this block
// noone was loading - we became re
s
ponsible to load this block
zbf
:=
f
.
zbf
zbf
:=
f
.
zbf
blkdata
,
treepath
,
pathRevMax
,
err
:=
zbf
.
LoadBlk
(
ctx
,
blk
)
blkdata
,
treepath
,
pathRevMax
,
err
:=
zbf
.
LoadBlk
(
ctx
,
blk
)
...
@@ -1268,7 +1326,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
...
@@ -1268,7 +1326,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// the DB, and instead will be served by kernel from its pagecache.
// the DB, and instead will be served by kernel from its pagecache.
//
//
// We cannot do this directly from reading goroutine - while reading
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding correspon
g
ing page in pagecache locked, and if
// kernel FUSE is holding correspon
d
ing page in pagecache locked, and if
// we would try to update that same page in pagecache it would result
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
// in deadlock inside kernel.
//
//
...
@@ -1276,16 +1334,72 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
...
@@ -1276,16 +1334,72 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// If we do it earlier - a simultaneous read covered by the same block could result
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
// and thus would trigger DB access again.
go
func
()
{
//
// XXX locking - invalidation must make sure this workers are finished.
// XXX if direct-io: don't touch pagecache
// XXX if direct-io: don't touch pagecache
st
:=
gfsconn
.
FileNotifyStoreCache
(
f
.
Inode
(),
blk
*
zbf
.
blksize
,
blkdata
)
go
f
.
uploadBlk
(
blk
,
loading
)
return
nil
}
// uploadBlk complements readBlk and uploads loaded blkdata into OS cache.
func
(
f
*
BigFile
)
uploadBlk
(
blk
int64
,
loading
*
blkLoadState
)
{
head
:=
f
.
head
// rlock zconnMu and make sure zwatcher is not asking us to pause.
// if it does - wait for a safer time not to deadlock.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry
:
for
{
head
.
zconnMu
.
RLock
()
// help zwatcher if it asks us to pause uploadings, so it can
// take zconnMu wlocked without deadlocks.
if
head
.
pauseOSCacheUpload
{
ready
:=
head
.
continueOSCacheUpload
head
.
zconnMu
.
RUnlock
()
<-
ready
continue
retry
}
break
}
// zwatcher is not currently trying to pause OS cache uploads.
// check if this block was already invalidated by zwatcher.
// if so don't upload the block into OS cache.
f
.
loadMu
.
Lock
()
loading_
:=
f
.
loading
[
blk
]
f
.
loadMu
.
Unlock
()
if
loading
!=
loading_
{
head
.
zconnMu
.
RUnlock
()
return
}
oid
:=
f
.
zbf
.
POid
()
blksize
:=
f
.
zbf
.
blksize
// signal to zwatcher not to run while we are performing the upload.
// upload with released zconnMu so that zwatcher can lock it even if to
// check inflightOSCacheUploads status.
atomic
.
AddInt32
(
&
head
.
inflightOSCacheUploads
,
+
1
)
head
.
zconnMu
.
RUnlock
()
st
:=
gfsconn
.
FileNotifyStoreCache
(
f
.
Inode
(),
blk
*
blksize
,
loading
.
blkdata
)
f
.
loadMu
.
Lock
()
f
.
loadMu
.
Lock
()
bug
:=
(
loading
!=
f
.
loading
[
blk
])
if
!
bug
{
delete
(
f
.
loading
,
blk
)
delete
(
f
.
loading
,
blk
)
}
f
.
loadMu
.
Unlock
()
f
.
loadMu
.
Unlock
()
// signal to zwatcher that we are done and it can continue.
atomic
.
AddInt32
(
&
head
.
inflightOSCacheUploads
,
-
1
)
if
bug
{
panic
(
fmt
.
Sprintf
(
"BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache"
,
oid
,
blk
))
}
if
st
==
fuse
.
OK
{
if
st
==
fuse
.
OK
{
return
return
}
}
...
@@ -1294,10 +1408,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
...
@@ -1294,10 +1408,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// pagecache control is supported by kernel). We can correctly live on
// pagecache control is supported by kernel). We can correctly live on
// with the error, but data access will be likely very slow. Tell user
// with the error, but data access will be likely very slow. Tell user
// about the problem.
// about the problem.
log
.
Errorf
(
"BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)"
,
zbf
.
POid
(),
blk
,
st
)
log
.
Errorf
(
"BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)"
,
oid
,
blk
,
st
)
}()
return
nil
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment