Commit a6874733 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7126b78b
...@@ -310,7 +310,7 @@ package main ...@@ -310,7 +310,7 @@ package main
// 4.6) processing ZODB invalidations and serving file reads (see 7) are // 4.6) processing ZODB invalidations and serving file reads (see 7) are
// organized to be mutually exclusive. // organized to be mutually exclusive.
// //
// (TODO head.zconnMu -> special mutex with Lock(ctx) so that Lock could be canceled) // (TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock could be canceled)
// //
// 5) after OS file cache was invalidated, we resync zhead to new database // 5) after OS file cache was invalidated, we resync zhead to new database
// view corresponding to tid. // view corresponding to tid.
...@@ -419,7 +419,7 @@ package main ...@@ -419,7 +419,7 @@ package main
// XXX describe locking // XXX describe locking
// //
// head.zconnMu write by handleδZ; read by read // head.zheadMu write by handleδZ; read by read
// -> rlockZHead() + lockZHead() ? // -> rlockZHead() + lockZHead() ?
// ... // ...
...@@ -489,7 +489,7 @@ type Head struct { ...@@ -489,7 +489,7 @@ type Head struct {
// ZODB connection for everything under this head // ZODB connection for everything under this head
// protects access to zconn & live _objects_ associated with it. // zheadMu protects access to zconn & live _objects_ associated with it.
// while it is rlocked zconn is guaranteed to stay viewing database at // while it is rlocked zconn is guaranteed to stay viewing database at
// particular view. // particular view.
// //
...@@ -499,7 +499,7 @@ type Head struct { ...@@ -499,7 +499,7 @@ type Head struct {
// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk) // it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
// with additional locking protocol to avoid deadlocks (see below for // with additional locking protocol to avoid deadlocks (see below for
// pauseOSCacheUpload + ...). // pauseOSCacheUpload + ...).
zconnMu sync.RWMutex // XXX -> zheadMu ? zheadMu sync.RWMutex
zconn *ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects. zconn *ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks. // zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
...@@ -512,7 +512,7 @@ type Head struct { ...@@ -512,7 +512,7 @@ type Head struct {
// XXX move zconn's current transaction to Head here? // XXX move zconn's current transaction to Head here?
// head/watch opens // head/watch opens
// XXX protected by ... head.zconnMu ? // XXX protected by ... zheadMu ?
wlinkTab map[*WatchLink]struct{} wlinkTab map[*WatchLink]struct{}
} }
...@@ -736,12 +736,12 @@ func (root *Root) handleδZ(δZ *zodb.EventCommit) { ...@@ -736,12 +736,12 @@ func (root *Root) handleδZ(δZ *zodb.EventCommit) {
continueOSCacheUpload := make(chan struct{}) continueOSCacheUpload := make(chan struct{})
retry: retry:
for { for {
head.zconnMu.Lock() head.zheadMu.Lock()
head.pauseOSCacheUpload = true head.pauseOSCacheUpload = true
head.continueOSCacheUpload = continueOSCacheUpload head.continueOSCacheUpload = continueOSCacheUpload
if head.inflightOSCacheUploads != 0 { if head.inflightOSCacheUploads != 0 {
head.zconnMu.Unlock() head.zheadMu.Unlock()
continue retry continue retry
} }
...@@ -751,11 +751,11 @@ retry: ...@@ -751,11 +751,11 @@ retry:
defer func() { defer func() {
head.pauseOSCacheUpload = false head.pauseOSCacheUpload = false
head.continueOSCacheUpload = nil head.continueOSCacheUpload = nil
head.zconnMu.Unlock() head.zheadMu.Unlock()
close(continueOSCacheUpload) close(continueOSCacheUpload)
}() }()
// head.zconnMu locked and all cache uploaders are paused // head.zheadMu locked and all cache uploaders are paused
zhead := head.zconn zhead := head.zconn
bfdir := head.bfdir bfdir := head.bfdir
...@@ -785,8 +785,8 @@ retry: ...@@ -785,8 +785,8 @@ retry:
case zBlk: // ZBlk* case zBlk: // ZBlk*
// blkBoundTo locking: no other bindZFile are running, // blkBoundTo locking: no other bindZFile are running,
// since we write-locked head.zconnMu and bindZFile is // since we write-locked head.zheadMu and bindZFile is
// run when loading objects - thus when head.zconnMu is // run when loading objects - thus when head.zheadMu is
// read-locked. // read-locked.
// //
// bfdir locking: similarly not needed, since we are // bfdir locking: similarly not needed, since we are
...@@ -923,7 +923,7 @@ retry: ...@@ -923,7 +923,7 @@ retry:
// //
// see "4.4) for all file/blk to in invalidate we do" // see "4.4) for all file/blk to in invalidate we do"
// //
// called with f.head.zconnMu wlocked. // called with f.head.zheadMu wlocked.
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) { func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk) defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)
...@@ -936,7 +936,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) { ...@@ -936,7 +936,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
// first try to retrieve f.loading[blk]; // first try to retrieve f.loading[blk];
// make sure f.loading[blk] is invalidated. // make sure f.loading[blk] is invalidated.
// //
// we are running with zconnMu wlocked - no need to lock f.loadMu // we are running with zheadMu wlocked - no need to lock f.loadMu
loading, ok := f.loading[blk] loading, ok := f.loading[blk]
if ok { if ok {
if loading.err == nil { if loading.err == nil {
...@@ -1045,8 +1045,8 @@ func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release fun ...@@ -1045,8 +1045,8 @@ func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release fun
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data. // /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) { func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
f.head.zconnMu.RLock() f.head.zheadMu.RLock()
defer f.head.zconnMu.RUnlock() defer f.head.zheadMu.RUnlock()
// cap read request to file size // cap read request to file size
end := off + int64(len(dest)) // XXX overflow? end := off + int64(len(dest)) // XXX overflow?
...@@ -1097,7 +1097,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -1097,7 +1097,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
// see "7) when we receive a FUSE read(#blk) request ..." in overview. // see "7) when we receive a FUSE read(#blk) request ..." in overview.
// //
// len(dest) == blksize. // len(dest) == blksize.
// called with head.zconnMu rlocked. // called with head.zheadMu rlocked.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) { func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk) defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
...@@ -1174,7 +1174,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1174,7 +1174,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// //
// See "7.2) for all registered client@at watchers ..." // See "7.2) for all registered client@at watchers ..."
// //
// Called with f.head.zconnMu rlocked. // Called with f.head.zheadMu rlocked.
// //
// XXX do we really need to use/propagate caller context here? ideally update // XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout. // watchers should be synchronous, and in practice we just use 30s timeout.
...@@ -1256,17 +1256,17 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre ...@@ -1256,17 +1256,17 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) { func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
head := f.head head := f.head
// rlock zconnMu and make sure zwatcher is not asking us to pause. // rlock zheadMu and make sure zwatcher is not asking us to pause.
// if it does - wait for a safer time not to deadlock. // if it does - wait for a safer time not to deadlock.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details. // see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry: retry:
for { for {
head.zconnMu.RLock() head.zheadMu.RLock()
// help zwatcher if it asks us to pause uploadings, so it can // help zwatcher if it asks us to pause uploadings, so it can
// take zconnMu wlocked without deadlocks. // take zheadMu wlocked without deadlocks.
if head.pauseOSCacheUpload { if head.pauseOSCacheUpload {
ready := head.continueOSCacheUpload ready := head.continueOSCacheUpload
head.zconnMu.RUnlock() head.zheadMu.RUnlock()
<-ready <-ready
continue retry continue retry
} }
...@@ -1282,17 +1282,17 @@ retry: ...@@ -1282,17 +1282,17 @@ retry:
loading_ := f.loading[blk] loading_ := f.loading[blk]
f.loadMu.Unlock() f.loadMu.Unlock()
if loading != loading_ { if loading != loading_ {
head.zconnMu.RUnlock() head.zheadMu.RUnlock()
return return
} }
oid := f.zfile.POid() oid := f.zfile.POid()
// signal to zwatcher not to run while we are performing the upload. // signal to zwatcher not to run while we are performing the upload.
// upload with released zconnMu so that zwatcher can lock it even if to // upload with released zheadMu so that zwatcher can lock it even if to
// check inflightOSCacheUploads status. // check inflightOSCacheUploads status.
atomic.AddInt32(&head.inflightOSCacheUploads, +1) atomic.AddInt32(&head.inflightOSCacheUploads, +1)
head.zconnMu.RUnlock() head.zheadMu.RUnlock()
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata) st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)
...@@ -1390,7 +1390,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1390,7 +1390,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
bfdir := head.bfdir bfdir := head.bfdir
// XXX locking // XXX locking
// XXX head.zconnMu.RLock() + defer unlock (see vvv for unpin vs pin and locked head) // XXX head.zheadMu.RLock() + defer unlock (see vvv for unpin vs pin and locked head)
// XXX if watch was already established - we need to update it // XXX if watch was already established - we need to update it
w := wlink.byfile[foid] w := wlink.byfile[foid]
...@@ -1784,8 +1784,8 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) ...@@ -1784,8 +1784,8 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
return nil, eINVALf("not oid") return nil, eINVALf("not oid")
} }
bfdir.head.zconnMu.RLock() bfdir.head.zheadMu.RLock()
defer bfdir.head.zconnMu.RUnlock() defer bfdir.head.zheadMu.RUnlock()
defer func() { defer func() {
if f != nil { if f != nil {
...@@ -1997,8 +1997,8 @@ func (f *BigFile) Close() error { ...@@ -1997,8 +1997,8 @@ func (f *BigFile) Close() error {
// /(head|<rev>)/at -> readAt serves read. // /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt() []byte { func (h *Head) readAt() []byte {
h.zconnMu.RLock() h.zheadMu.RLock()
defer h.zconnMu.RUnlock() defer h.zheadMu.RUnlock()
return []byte(h.zconn.At().String()) return []byte(h.zconn.At().String())
} }
...@@ -2007,9 +2007,9 @@ func (h *Head) readAt() []byte { ...@@ -2007,9 +2007,9 @@ func (h *Head) readAt() []byte {
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status { func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
at := head.rev at := head.rev
if at == 0 { if at == 0 {
head.zconnMu.RLock() head.zheadMu.RLock()
at = head.zconn.At() at = head.zconn.At()
head.zconnMu.RUnlock() head.zheadMu.RUnlock()
} }
t := at.Time().Time t := at.Time().Time
...@@ -2020,8 +2020,8 @@ func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.S ...@@ -2020,8 +2020,8 @@ func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.S
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat. // /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status { func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
f.head.zconnMu.RLock() f.head.zheadMu.RLock()
defer f.head.zconnMu.RUnlock() defer f.head.zheadMu.RUnlock()
f.getattr(out) f.getattr(out)
return fuse.OK return fuse.OK
...@@ -2061,7 +2061,7 @@ var gmntpt string ...@@ -2061,7 +2061,7 @@ var gmntpt string
// debugging // debugging
var gdebug = struct { var gdebug = struct {
// .wcfs/zhead opens // .wcfs/zhead opens
// protected by groot.head.zconnMu // protected by groot.head.zheadMu
zheadSockTab map[*FileSock]struct{} zheadSockTab map[*FileSock]struct{}
}{} }{}
...@@ -2079,8 +2079,8 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse ...@@ -2079,8 +2079,8 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse
sk := NewFileSock() sk := NewFileSock()
sk.CloseRead() sk.CloseRead()
groot.head.zconnMu.Lock() groot.head.zheadMu.Lock()
defer groot.head.zconnMu.Unlock() defer groot.head.zheadMu.Unlock()
// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle) // XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
gdebug.zheadSockTab[sk] = struct{}{} gdebug.zheadSockTab[sk] = struct{}{}
......
...@@ -1448,7 +1448,7 @@ def test_wcfs_pintimeout_kill(): ...@@ -1448,7 +1448,7 @@ def test_wcfs_pintimeout_kill():
# watch with @at > head - must wait for head to become >= at # watch with @at > head - must wait for head to become >= at
# XXX too far ahead - error? # XXX too far ahead - reject?
@func @func
def test_wcfs_watch_setup_ahead(): def test_wcfs_watch_setup_ahead():
t = tDB(); zf = t.zfile t = tDB(); zf = t.zfile
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment