// Copyright (C) 2018-2021 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. // Program wcfs provides filesystem server with file data backed by wendelin.core arrays. // // Intro // // Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile) // and array metadata like dtype, shape and strides associated with it. This // program exposes as files only ZBigFile data and leaves rest of // array-specific handling to clients. Every ZBigFile is exposed as one separate // file that represents whole ZBigFile's data. // // For a client, the primary way to access a bigfile should be to mmap // head/bigfile/<bigfileX> which represents always latest bigfile data. // Clients that want to get isolation guarantee should subscribe for // invalidations and re-mmap invalidated regions to file with pinned bigfile revision for // the duration of their transaction. See "Isolation protocol" for details(*). // // In the usual situation when bigfiles are big, and there are O(1)/δt updates, // there should be no need for any cache besides shared kernel cache of latest // bigfile data. // // -------- // // (*) wcfs servers comes accompanied by Python and C++ client packages that // take care about isolation protocol details and provide to clients simple // interface similar to regular files. // // // Filesystem organization // // Top-level structure of provided filesystem is as follows: // // head/ ; latest database view // ... // @<rev1>/ ; database view as of revision <revX> // ... // @<rev2>/ // ... // ... // // where head/ represents latest data as stored in upstream ZODB, and // @<revX>/ represents data as of database revision <revX>. // // head/ has the following structure: // // head/ // at ; data inside head/ is as of this ZODB transaction // watch ; channel for bigfile invalidations // bigfile/ ; bigfiles' data // <oid(ZBigFile1)> // <oid(ZBigFile2)> // ... // // where /bigfile/<bigfileX> represents latest bigfile data as stored in // upstream ZODB. As there can be some lag receiving updates from the database, // /at describes precisely ZODB state for which bigfile data is currently // exposed. Whenever bigfile data is changed in upstream ZODB, information // about the changes is first propagated to /watch, and only after that // /bigfile/<bigfileX> is updated. See "Isolation protocol" for details. // // @<revX>/ has the following structure: // // @<revX>/ // bigfile/ ; bigfiles' data as of revision <revX> // <oid(ZBigFile1)> // <oid(ZBigFile2)> // ... // // where /bigfile/<bigfileX> represent bigfile data as of revision <revX>. // // Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in // wcfs filesystem. Similarly @<revX>/ become visible only after access. // // // Isolation protocol // // In order to support isolation, wcfs implements isolation protocol that // must be cooperatively followed by both wcfs and client. // // First, client mmaps latest bigfile, but does not access it: // // mmap(head/bigfile/<bigfileX>) // // Then client opens head/watch and tells wcfs through it for which ZODB state // it wants to get bigfile's view: // // C: 1 watch <bigfileX> @<at> // // The server then, after potentially sending initial pin and unpin messages // (see below), reports either success or failure: // // S: 1 ok // S: 1 error ... ; if <at> is too far away back from head/at // // The server sends "ok" reply only after head/at is ≥ requested <at>, and only // after all initial pin/unpin messages are fully acknowledged by the client. // The client can start to use mmapped data after it gets "ok". // The server sends "error" reply e.g. if requested <at> is too far away back // from head/at, or on any other error. // TODO specify watch state after error. // // Upon watch request, either initially, or after sending "ok", the server will be notifying the // client about file blocks that client needs to pin in order to observe file's // data as of <at> revision: // // The filesystem server itself receives information about changed data from // ZODB server through regular ZODB invalidation channel (as it is ZODB client // itself). Then, separately for each changed file block, before actually // updating head/bigfile/<bigfileX> content, it notifies through opened // head/watch links to clients, that had requested it (separately to each // client), about the changes: // // S: <2·k> pin <bigfileX> #<blk> @<rev_max> ; @head means unpin // // and waits until all clients confirm that changed file block can be updated // in global OS cache. // // The client in turn should now re-mmap requested to be pinned block to bigfile@<rev_max> // // # mmapped at address corresponding to #blk // mmap(@<rev_max>/bigfile/<bigfileX>, #blk, MAP_FIXED) // // or, if given @head as @<rev_max>, to bigfile@head // // mmap(head/bigfile/<bigfileX>, #blk, MAP_FIXED) // // and must send ack back to the server when it is done: // // C: <2·k> ack // // The server sends pin notifications only for file blocks, that are known to // be potentially changed after client's <at>, and <rev_max> describes the // upper bound for the block revision as of <at> database view: // // <rev_max> ≤ <at> ; block stays unchanged in (<rev_max>, <at>] range // // The server maintains short history tail of file changes to be able to // support openings with <at> being slightly in the past compared to current // head/at. The server might reject a watch request if <at> is too far away in // the past from head/at. The client is advised to restart its transaction with // more uptodate database view if it gets watch setup error. // // A later request from the client for the same <bigfileX> but with different // <at>, overrides previous watch request for that file. A client can use "-" // instead of "@<at>" to stop watching a file. // // A single client can send several watch requests through single head/watch // open, as well as it can use several head/watch opens simultaneously. // The server sends pin notifications for all files requested to be watched via // every opened head/watch link. // // Note: a client could use a single watch to manage its several views for the same // file but with different <at>. This could be achieved via watching with // @<at_min>, and then deciding internally which views needs to be adjusted and // which views need not. Wcfs does not oblige clients to do so though, and a // client is free to use as many head/watch openings as it needs to. // // When clients are done with @<revX>/bigfile/<bigfileX> (i.e. client's // transaction ends and array is unmapped), the server sees number of opened // files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically // destroys @<revX>/bigfile/<bigfileX> after reasonable timeout. // // The client should send "bye" before closing head/watch file: // // C: <2·k+1> bye // // // Protection against slow or faulty clients // // If a client, on purpose or due to a bug or being stopped, is slow to respond // with ack to file invalidation notification, it creates a problem because the // server will become blocked waiting for pin acknowledgments, and thus all // other clients, that try to work with the same file, will get stuck. // // The problem could be avoided, if wcfs would reside inside OS kernel and this // way could be able to manipulate clients address space directly (then // isolation protocol won't be needed). It is also possible to imagine // mechanism, where wcfs would synchronously change clients' address space via // injecting trusted code and running it on client side via ptrace to adjust // file mappings. // // However ptrace does not work when client thread is blocked under pagefault, // and that is exactly what wcfs would need to do to process invalidations // lazily, because eager invalidation processing results in prohibitively slow // file opens. See internal wcfs overview for details about why ptrace // cannot be used and why lazy invalidation processing is required. // // Lacking OS primitives to change address space of another process and not // being able to work it around with ptrace in userspace, wcfs takes approach // to kill a slow client on 30 seconds timeout by default. // // // Writes // // As each bigfile is represented by 1 synthetic file, there can be several // write schemes: // // 1. mmap(MAP_PRIVATE) + writeout by client // // In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local // user changes are not automatically propagated back to the file. When there // is a need to commit, client investigates via some OS mechanism, e.g. // /proc/self/pagemap or something similar, which pages of this mapping it // modified. Knowing this it knows which data it dirtied and so can write this // data back to ZODB itself, without filesystem server providing write support. // // 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client // // In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages // protection. Then whenever write fault occurs, client allocates RAM from // shmfs, copies faulted page to it, and then mmaps RAM page with RW protection // in place of original bigfile page. Writeout implementation should be similar // to "1", only here client already knows the pages it dirtied, and this way // there is no need to consult /proc/self/pagemap. // // The advantage of this scheme over mmap(MAP_PRIVATE) is that in case // there are several in-process mappings of the same bigfile with overlapping // in-file ranges, changes in one mapping will be visible in another mapping. // Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs // faulted page into a page completely private to this mapping, so that other // MAP_PRIVATE mappings of this file, including ones created from the same // process, do not see changes made to the first mapping. // // Since wendelin.core needs to provide coherency in between different slices // of the same array, this is the mode wendelin.core actually uses. // // 3. write to wcfs // // TODO we later could implement "write-directly" mode where clients would write // data directly into the file. package main // Wcfs organization // // Wcfs is a ZODB client that translates ZODB objects into OS files as would // non-wcfs wendelin.core do for a ZBigFile. Contrary to non-wcfs wendelin.core, // it keeps bigfile data in shared OS cache efficiently. It is organized as follows: // // 1) 1 ZODB connection for "latest data" for whole filesystem (zhead). // 2) head/bigfile/* of all bigfiles represent state as of zhead.At . // 3) for head/bigfile/* the following invariant is maintained: // // #blk ∈ OS file cache => all BTree/Bucket/ZBlk that lead to blk are tracked(%) // // The invariant helps on invalidation: when δFtail (see below) sees a // changed oid, it is guaranteed that if the change affects block that was // ever provided to OS, δFtail will detect that this block has changed. // And if oid relates to a file block but is not in δFtail's tracking set - // we know that block is not cached and will trigger ZODB load on a future // file read. // // Currently we maintain this invariant by adding ZBlk/LOBTree/LOBucket // objects to δFtail on every access, and never shrinking that tracking set. // In the future we may want to try to synchronize to kernel freeing its // pagecache pages. // // 4) when we receive an invalidation message from ZODB - we process it and // propagate invalidations to OS file cache of head/bigfile/*: // // invalidation message: δZ = (tid↑, []oid) // // 4.1) δF = δFtail.Update(δZ) // // δFtail (see below) converts ZODB-level changes into information about // which blocks of which files were modified and need to be invalidated: // // δF = (tid↑, {} file -> []#blk) // // Note that δF might be not full and reflects only changes to files and // blocks that were requested to be tracked. However because of the invariant // δF covers in full what needs to be invalidated in the OS file cache. // // 4.2) for all file/blk to invalidate we do: // // - try to retrieve head/bigfile/file[blk] from OS file cache(*); // - if retrieved successfully -> store retrieved data back into OS file // cache for @<rev>/bigfile/file[blk], where // // rev = δFtail.BlkRevAt(file, #blk, zhead.at) // // - invalidate head/bigfile/file[blk] in OS file cache. // // This preserves previous data in OS file cache in case it will be needed // by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk] // won't be served from OS file cache and instead will trigger a FUSE read // request to wcfs. // // 4.3) no invalidation messages are sent to wcfs clients at this point(+). // // 4.4) processing ZODB invalidations and serving file reads (see 7) are // organized to be mutually exclusive. // // 4.5) similarly, processing ZODB invalidations and setting up watches (see // 7.2) are organized to be mutually exclusive. // // 5) after OS file cache was invalidated, we resync zhead to new database // view corresponding to tid. // // 6) a ZBigFile-level history tail is maintained in δFtail. // // δFtail translates ZODB object-level changes into information about which // blocks of which ZBigFile were modified, and provides service to query // that information. // // It semantically consists of // // []δF // // where δF represents a change in files space // // δF: // .rev↑ // {} file -> {}blk // // min(rev) in δFtail is min(@at) at which head/bigfile/file is currently watched (see below). // // To support initial openings with @at being slightly in the past, we also // make sure that min(rev) is enough to cover last 1 minute of history // from head/at. // // Scalability of δFtail plays important role in scalability of WCFS because // δFtail, besides other places, is queried and potentially rebuilt at every // FUSE read request (see 7 below). // // See documentation in internal/zdata/δftail.go for more details on ΔFtail // and its scalability properties. // // 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows: // // 7.1) load blkdata for head/bigfile/file[blk] @zhead.at . // // while loading this also gives upper bound estimate of when the block // was last changed: // // rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk)) // // it is not exact because BTree/Bucket can change (e.g. rebalance) // but still point to the same k->ZBlk. // // we also use δFtail to find either exact blk revision or another upper // bound if file[blk] has no change during δFtail coverage: // // rev(blk) = δFtail.BlkRevAt(file, #blk, zhead.at) // // below rev'(blk) is min(of the estimates found): // // rev(blk) ≤ rev'(blk) rev'(blk) = min(^^^) // // Note: we delay recomputing δFtail.BlkRevAt(file, #blk, head) because // using just cheap revmax estimate can frequently result in all watches // being skipped. // // 7.2) for all registered client@at watches of head/bigfile/file: // // - rev'(blk) ≤ at: -> do nothing // - rev'(blk) > at: // - if blk ∈ watch.pinned -> do nothing // - rev = δFtail.BlkRevAt(file, #blk, at) // - watch.pin(file, #blk, @rev) // - watch.pinned += blk // // where // // watch.pin(file, #blk, @rev) // // sends pin message according to "Isolation protocol", and is assumed // to cause // // remmap(file, #blk, @rev/bigfile/file) // // on client. // // ( one could imagine adjusting mappings synchronously via running // wcfs-trusted code via ptrace that wcfs injects into clients, but ptrace // won't work when client thread is blocked under pagefault or syscall(^) ) // // in order to support watching for each head/bigfile/file // // [] of watch{client@at↑, pinned} // // is maintained. // // 7.3) blkdata is returned to kernel. // // Thus a client that wants latest data on pagefault will get latest data, // and a client that wants @rev data will get @rev data, even if it was this // "old" client that triggered the pagefault(~). // // 8) serving FUSE reads from @<rev>/bigfile/file is organized similarly to // serving reads from head/bigfile/file, but with using dedicated per-<rev> // ZODB connection and without notifying any watches. // // 9) for every ZODB connection (zhead + one per @<rev>) a dedicated read-only // transaction is maintained. For zhead, every time it is resynced (see "5") // the transaction associated with zhead is renewed. // // TODO 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout // // // (*) see notes.txt -> "Notes on OS pagecache control" // (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access" // (~) see notes.txt -> "Changing mmapping while under pagefault is possible" // (^) see notes.txt -> "Client cannot be ptraced while under pagefault" // (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change. // Wcfs locking organization // // As it was said processing ZODB invalidations (see "4") and serving file // reads (see "7") are organized to be mutually exclusive. To do so a major RW // lock - zheadMu - is used. Whenever ZODB invalidations are processed and // zhead.at is updated - zheadMu.W is taken. Contrary whenever file read is // served and in other situations - which needs zhead to remain viewing // database at the same state - zheadMu.R is taken. // // Several locks that protect internal data structures are minor to zheadMu - // they need to be taken only under zheadMu.R (to protect e.g. multiple readers // running simultaneously to each other), but do not need to be taken at all if // zheadMu.W is taken. In data structures such locks are noted as follows // // xMu sync.Mutex // zheadMu.W | zheadMu.R + xMu // // If a lock is not minor to zheadMu, it is still ok to lock it under zheadMu.R // as zheadMu, being the most major lock in wcfs, always comes locked first, if // it needs to be locked. // // For watches, similarly to zhead, watch.at is protected by major-for-watch // per-watch RW lock watch.atMu . When watch.at is updated during watch // setup/upgrade time - watch.atMu.W is taken. Contrary whenever watch is // notified with pin messages - watch.atMu.R is taken to make sure watch.at // stays unchanged while pins are prepared and processed. // // For watches, similarly to zheadMu, there are several minor-to-atMu locks // that protect internal data structures. Such locks are noted similarly to // zheadMu enslavement. // // In addition to what is written above there are other ordering rules that are // followed consistently to avoid hitting deadlock: // // BigFile.watchMu > Watch.atMu // WatchLink.byfileMu > BigFile.watchMu // WatchLink.byfileMu > BigFileDir.fileMu // WatchLink.byfileMu > Watch.atMu // Notation used // // δZ - change in ZODB space // δB - change in BTree*s* space // δT - change in BTree(1) space // δF - change in File*s* space // δfile - change in File(1) space // // f - BigFile // bfdir - BigFileDir // wlink - WatchLink // w - Watch import ( "bufio" "context" "flag" "fmt" "io" stdlog "log" "math" "os" "runtime" "sort" "strings" "sync" "sync/atomic" "syscall" "time" log "github.com/golang/glog" "lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xio" "lab.nexedi.com/kirr/go123/xruntime/race" "lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb/btree" _ "lab.nexedi.com/kirr/neo/go/zodb/wks" "github.com/johncgriffin/overflow" "github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse/nodefs" "github.com/pkg/errors" "lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xzodb" "lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/zdata" ) // shorthands for ZBigFile and ZBlk* type ( ZBlk = zdata.ZBlk ZBlk0 = zdata.ZBlk0 ZBlk1 = zdata.ZBlk1 ZData = zdata.ZData ZBigFile = zdata.ZBigFile ) // Root represents root of wcfs filesystem. type Root struct { fsNode // ZODB storage we work with zstor zodb.IStorage // ZODB DB handle for zstor. // keeps cache of connections for @<rev>/ accesses. // only one connection is used for each @<rev>. zdb *zodb.DB // directory + ZODB connection for head/ // (zhead is Resync'ed and is kept outside zdb pool) head *Head // directories + ZODB connections for @<rev>/ revMu sync.Mutex revTab map[zodb.Tid]*Head } // /(head|<rev>)/ - served by Head. type Head struct { fsNode rev zodb.Tid // 0 for head/, !0 for @<rev>/ bfdir *BigFileDir // bigfile/ // at - served by .readAt // watch - implicitly linked to by fs // ZODB connection for everything under this head // zheadMu protects zconn.At & live _objects_ associated with it. // while it is rlocked zconn is guaranteed to stay viewing database at // particular view. // // zwatcher write-locks this and knows noone is using ZODB objects and // noone mutates OS file cache while zwatcher is running. // // it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk) // with additional locking protocol to avoid deadlocks (see below for // pauseOSCacheUpload + ...). // // TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled zheadMu sync.RWMutex zconn *xzodb.ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects. // zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks. // see notes.txt -> "Kernel locks page on read/cache store/..." for details. pauseOSCacheUpload bool continueOSCacheUpload chan struct{} // uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently. inflightOSCacheUploads int32 // head/watch opens wlinkMu sync.Mutex wlinkTab map[*WatchLink]struct{} // waiters for zhead.At to become ≥ their at. hwaitMu sync.Mutex // zheadMu.W | zheadMu.R + hwaitMu hwait map[hwaiter]struct{} // set{(at, ready)} } // /(head|<rev>)/bigfile/ - served by BigFileDir. type BigFileDir struct { fsNode head *Head // parent head/ or @<rev>/ // {} oid -> <bigfileX> fileMu sync.Mutex // zheadMu.W | zheadMu.R + fileMu fileTab map[zodb.Oid]*BigFile // δ tail of tracked BTree nodes of all BigFiles + -> which file // (used only for head/, not revX/) δFtail *zdata.ΔFtail // read/write access protected by zheadMu.{R,W} } // /(head|<rev>)/bigfile/<bigfileX> - served by BigFile. type BigFile struct { fsNode // this BigFile is under .head/bigfile/; it views ZODB via .head.zconn // parent's BigFileDir.head is the same. head *Head // ZBigFile top-level object zfile *ZBigFile // things read/computed from .zfile; constant during lifetime of current transaction. // i.e. changed under zhead.W blksize int64 // zfile.blksize size int64 // zfile.Size() revApprox zodb.Tid // approx last revision that modified zfile data // ( we can't know rev fully as some later blocks could be learnt only // while populating δFtail lazily. For simplicity we don't delve into // updating revApprox during lifetime of current transaction ) // inflight loadings of ZBigFile from ZODB. // successful load results are kept here until blkdata is put into OS pagecache. // // Being a staging area for data to enter OS cache, loading has to be // consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache. loadMu sync.Mutex // zheadMu.W | zheadMu.R + loadMu loading map[int64]*blkLoadState // #blk -> {... blkdata} // watches attached to this file. // // both watches in already "established" state (i.e. initial watch // request was completed and answered with "ok"), and watches in // progress of being established are kept here. watchMu sync.RWMutex watchTab map[*Watch]struct{} } // blkLoadState represents a ZBlk load state/result. // // when !ready the loading is in progress. // when ready the loading has been completed. type blkLoadState struct { ready chan struct{} blkdata []byte err error } // /head/watch - served by WatchNode. type WatchNode struct { fsNode head *Head // parent head/ idNext int32 // ID for next opened WatchLink } // /head/watch open - served by WatchLink. type WatchLink struct { sk *FileSock // IO channel to client id int32 // ID of this /head/watch handle (for debug log) head *Head // watches associated with this watch link. // // both already established, and watches being initialized in-progress are registered here. // (see setupWatch) byfileMu sync.Mutex byfile map[zodb.Oid]*Watch // {} foid -> Watch // IO reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages txMu sync.Mutex rxMu sync.Mutex rxTab map[/*stream*/uint64]chan string // client replies go via here } // Watch represents watching for changes to 1 BigFile over particular watch link. type Watch struct { link *WatchLink // link to client file *BigFile // watching this file // atMu, similarly to zheadMu, protects watch.at and pins associated with Watch. // atMu.R guarantees that watch.at is not changing, but multiple // simultaneous pins could be running (used e.g. by readPinWatchers). // atMu.W guarantees that only one user has watch.at write access and // that no pins are running (used by setupWatch). atMu sync.RWMutex at zodb.Tid // requested to be watched @at pinnedMu sync.Mutex // atMu.W | atMu.R + pinnedMu pinned map[int64]*blkPinState // {} blk -> {... rev} blocks that are already pinned to be ≤ at } // blkPinState represents state/result of pinning one block. // // when !ready the pinning is in progress. // when ready the pinning has been completed. type blkPinState struct { rev zodb.Tid // revision to which the block is being or has been pinned ready chan struct{} err error } // -------- ZODB cache control -------- // zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict // LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive // because it is essentially the index where to find ZBigFile data. // // For the data itself - we put it to kernel pagecache and always deactivate // from ZODB right after that. type zodbCacheControl struct {} func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy { switch obj.(type) { // don't let ZBlk*/ZData to pollute the cache case *ZBlk0: return zodb.PCacheDropObject | zodb.PCacheDropState case *ZBlk1: return zodb.PCacheDropObject | zodb.PCacheDropState case *ZData: return zodb.PCacheDropObject | zodb.PCacheDropState // keep ZBigFile and its btree index in cache to speedup file data access. // // ZBigFile is top-level object that is used on every block load, and // it would be a waste to evict ZBigFile from cache. case *ZBigFile: return zodb.PCachePinObject | zodb.PCacheKeepState case *btree.LOBTree: return zodb.PCachePinObject | zodb.PCacheKeepState case *btree.LOBucket: return zodb.PCachePinObject | zodb.PCacheKeepState } return 0 } // -------- 4) ZODB invalidation -> OS cache -------- func traceZWatch(format string, argv ...interface{}) { if !log.V(1) { return } log.InfoDepth(1, fmt.Sprintf("zwatcher: " + format, argv...)) } func debugZWatch(format string, argv ...interface{}) { if !log.V(2) { return } log.InfoDepth(1, fmt.Sprintf("zwatcher: " + format, argv...)) } // zwatcher watches for ZODB changes. // // see "4) when we receive an invalidation message from ZODB ..." func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err error) { defer xerr.Contextf(&err, "zwatch %s", root.zstor.URL()) traceZWatch(">>>") var zevent zodb.Event var ok bool for { debugZWatch("select ...") select { case <-ctx.Done(): traceZWatch("cancel") return ctx.Err() case zevent, ok = <-zwatchq: if !ok { traceZWatch("zwatchq closed") return nil // closed } } traceZWatch("zevent: %s", zevent) switch zevent := zevent.(type) { default: return fmt.Errorf("unexpected event: %T", zevent) case *zodb.EventError: return zevent.Err case *zodb.EventCommit: err = root.handleδZ(ctx, zevent) if err != nil { return err } } } } // handleδZ handles 1 change event from ZODB notification. func (root *Root) handleδZ(ctx context.Context, δZ *zodb.EventCommit) (err error) { defer xerr.Contextf(&err, "handleδZ @%s", δZ.Tid) head := root.head // while we are invalidating OS cache, make sure that nothing, that // even reads /head/bigfile/*, is running (see 4.4). // // also make sure that cache uploaders we spawned (uploadBlk) are all // paused, or else they could overwrite OS cache with stale data. // see notes.txt -> "Kernel locks page on read/cache store/..." for // details on how to do this without deadlocks. continueOSCacheUpload := make(chan struct{}) retry: for { // TODO ctx cancel head.zheadMu.Lock() head.pauseOSCacheUpload = true head.continueOSCacheUpload = continueOSCacheUpload // NOTE need atomic load, since inflightOSCacheUploads // decrement is done not under zheadMu. if atomic.LoadInt32(&head.inflightOSCacheUploads) != 0 { head.zheadMu.Unlock() continue retry } break } defer func() { head.pauseOSCacheUpload = false head.continueOSCacheUpload = nil head.zheadMu.Unlock() close(continueOSCacheUpload) }() // zheadMu.W taken and all cache uploaders are paused zhead := head.zconn bfdir := head.bfdir // invalidate kernel cache for data in changed files δF, err := bfdir.δFtail.Update(δZ) // δF <- δZ |tracked if err != nil { return err } if log.V(2) { // debug dump δF log.Infof("\n\nS: handleδZ: δF (#%d):\n", len(δF.ByFile)) for foid, δfile := range δF.ByFile { blkv := δfile.Blocks.Elements() sort.Slice(blkv, func(i, j int) bool { return blkv[i] < blkv[j] }) flags := "" if δfile.Size { flags += "S" } if δfile.Epoch { flags += "E" } log.Infof("S: \t- %s\t%2s %v\n", foid, flags, blkv) } log.Infof("\n\n") } // invalidate kernel cache for file data wg := xsync.NewWorkGroup(ctx) for foid, δfile := range δF.ByFile { // file was requested to be tracked -> it must be present in fileTab file := bfdir.fileTab[foid] if δfile.Epoch { // XXX while invalidating whole file at epoch is easy, // it becomes not so easy to handle isolation if epochs // could be present. For this reason we forbid changes // to ZBigFile objects for now. return fmt.Errorf("ZBigFile<%s> changed @%s", foid, δF.Rev) // wg.Go(func(ctx context.Context) error { // return file.invalidateAll() // NOTE does not accept ctx // }) } else { for blk := range δfile.Blocks { blk := blk wg.Go(func(ctx context.Context) error { return file.invalidateBlk(ctx, blk) }) } } } err = wg.Wait() if err != nil { return err } // invalidate kernel cache for attributes // we need to do it only if we see topology (i.e. btree) change // // do it after completing data invalidations. wg = xsync.NewWorkGroup(ctx) for foid, δfile := range δF.ByFile { if !δfile.Size { continue } file := bfdir.fileTab[foid] // must be present wg.Go(func(ctx context.Context) error { return file.invalidateAttr() // NOTE does not accept ctx }) } err = wg.Wait() if err != nil { return err } // resync .zhead to δZ.tid // 1. abort old and resync to new txn/at transaction.Current(zhead.TxnCtx).Abort() _, ctx = transaction.New(context.Background()) err = zhead.Resync(ctx, δZ.Tid) if err != nil { return err } zhead.TxnCtx = ctx // 2. restat invalidated ZBigFile // NOTE no lock needed since .blksize and .size are constant during lifetime of one txn. // TODO -> parallel for foid, δfile := range δF.ByFile { file := bfdir.fileTab[foid] // must be present zfile := file.zfile if δfile.Size { size, sizePath, blkCov, err := zfile.Size(ctx) if err != nil { return err } file.size = size // see "3) for */head/data the following invariant is maintained..." bfdir.δFtail.Track(zfile, -1, sizePath, blkCov, nil) } // NOTE we can miss a change to file if δblk is not yet tracked // that's why revision is only approximated file.revApprox = zhead.At() } // notify .wcfs/zhead for sk := range gdebug.zheadSockTab { _, err := fmt.Fprintf(xio.BindCtxW(sk, ctx), "%s\n", δZ.Tid) if err != nil { log.Errorf("zhead: %s: write: %s (detaching reader)", sk.file, err) sk.Close() delete(gdebug.zheadSockTab, sk) } } // shrink δFtail not to grow indefinitely. // cover history for at least 1 minute, but including all watches. // No need to lock anything because we are holding zheadMu and // setupWatch too runs with zheadMu locked. // // TODO shrink δFtail only once in a while - there is no need to compute // revCut and cut δFtail on every transaction. revCut := zodb.TidFromTime(zhead.At().Time().Add(-1*time.Minute)) for wlink := range head.wlinkTab { for _, w := range wlink.byfile { if w.at < revCut { revCut = w.at } } } bfdir.δFtail.ForgetPast(revCut) // notify zhead.At waiters for hw := range head.hwait { if hw.at <= δZ.Tid { delete(head.hwait, hw) close(hw.ready) } } return nil } // hwaiter represents someone waiting for zhead to become ≥ at. type hwaiter struct { at zodb.Tid ready chan struct{} } // zheadWait waits till head.zconn.At becomes ≥ at. // // It returns error either if wcfs is down or ctx is canceled. func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) { defer xerr.Contextf(&err, "wait zhead ≥ %s", at) if head.rev != 0 { panic("must be called only for head/, not @revX/") } // TODO check wcfs.down // check if zhead is already ≥ at head.zheadMu.RLock() if head.zconn.At() >= at { head.zheadMu.RUnlock() return nil } // no - we have to wait for it ready := make(chan struct{}) head.hwaitMu.Lock() head.hwait[hwaiter{at, ready}] = struct{}{} head.hwaitMu.Unlock() head.zheadMu.RUnlock() select { case <-ctx.Done(): return ctx.Err() case <-ready: return nil // ok - zhead.At went ≥ at } } // invalidateBlk invalidates 1 file block in kernel cache. // // see "4.2) for all file/blk to in invalidate we do" // called with zheadMu wlocked. func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) { defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk) fsconn := gfsconn blksize := f.blksize off := blk*blksize var blkdata []byte = nil // first try to retrieve f.loading[blk]; // make sure f.loading[blk] is invalidated. // // we are running with zheadMu wlocked - no need to lock f.loadMu loading, ok := f.loading[blk] if ok { if loading.err == nil { blkdata = loading.blkdata } delete(f.loading, blk) } // TODO skip retrieve/store if len(f.watchTab) == 0 // try to retrieve cache of current head/data[blk], if we got nothing from f.loading if blkdata == nil { blkdata = make([]byte, blksize) n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata) if st != fuse.OK { log.Errorf("%s: retrieve blk #%d from cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, st) } blkdata = blkdata[:n] } // if less than blksize was cached - probably the kernel had to evict // some data from its cache already. In such case we don't try to // preserve the rest and drop what was read, to avoid keeping the // system overloaded. // // if we have the data - preserve it under @revX/bigfile/file[blk]. if int64(len(blkdata)) == blksize { err := func() error { // store retrieved data back to OS cache for file @<rev>/file[blk] δFtail := f.head.bfdir.δFtail blkrev, _, err := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At()) if err != nil { return err } frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid()) if err != nil { return fmt.Errorf("BUG: %s", err) } defer funlock() st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata) if st != fuse.OK { return fmt.Errorf("BUG: %s: store cache: %s", frev.path(), st) } return nil }() if err != nil { log.Errorf("%s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err) } } // invalidate file/head/data[blk] in OS file cache. st := fsconn.FileNotify(f.Inode(), off, blksize) if st != fuse.OK { return syscall.Errno(st) } return nil } // invalidateAttr invalidates file attributes in kernel cache. // // complements invalidateBlk and is used to invalidate file size. // called with zheadMu wlocked. func (f *BigFile) invalidateAttr() (err error) { defer xerr.Contextf(&err, "%s: invalidate attr", f.path()) fsconn := gfsconn st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only if st != fuse.OK { return syscall.Errno(st) } return nil } // invalidateAll invalidates file attributes and all file data in kernel cache. // // complements invalidateAttr and invalidateBlk and is used to completely reset // kernel file cache on ΔFtail epoch. // called with zheadMu wlocked. func (f *BigFile) invalidateAll() (err error) { defer xerr.Contextf(&err, "%s: invalidate all", f.path()) fsconn := gfsconn st := fsconn.FileNotify(f.Inode(), 0, -1) // metadata + all data if st != fuse.OK { return syscall.Errno(st) } return nil } // lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel // and won't change until unlock. // // We need node ID to be know to the kernel, when we need to store data into // file's kernel cache - if the kernel don't have the node ID for the file in // question, FileNotifyStoreCache will just fail. // // For kernel to know the inode lockRevFile issues regular filesystem lookup // request which goes to kernel and should go back to wcfs. It is thus not safe // to use lockRevFile from under FUSE request handler as doing so might deadlock. // // Caller must call unlock when inode ID is no longer required to be present. // It is safe to simultaneously call multiple lockRevFile with the same arguments. func (root *Root) lockRevFile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, unlock func(), err error) { fsconn := gfsconn frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now defer xerr.Contextf(&err, "/: lockRevFile %s", frevpath) // open through kernel frevospath := gmntpt + "/" + frevpath // now starting from OS / f, err := os.Open(frevospath) if err != nil { return nil, nil, err } xfrev := fsconn.LookupNode(root.Inode(), frevpath) // must be !nil as open succeeded return xfrev.Node().(*BigFile), func() { f.Close() }, nil } // -------- 7) FUSE read(#blk) -------- // /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data. func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) { f.head.zheadMu.RLock() // TODO +fctx to cancel defer f.head.zheadMu.RUnlock() // cap read request to file size end, ok := overflow.Add64(off, int64(len(dest))) if !ok { end = math.MaxInt64 // cap read request till max possible file size } if end > f.size { end = f.size } if end <= off { // the kernel issues e.g. [0 +4K) read for f.size=0 and expects to get (0, ok) // POSIX also says to return 0 if off >= f.size return fuse.ReadResultData(nil), fuse.OK } // widen read request to be aligned with blksize granularity // (we can load only whole ZBlk* blocks) aoff := off - (off % f.blksize) aend := end if re := end % f.blksize; re != 0 { aend += f.blksize - re } // TODO use original dest if it can fit the data dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file // TODO better ctx = transaction.PutIntoContext(ctx, txn) ctx, cancel := xcontext.Merge(fctx, f.head.zconn.TxnCtx) defer cancel() // read/load all block(s) in parallel wg := xsync.NewWorkGroup(ctx) for blkoff := aoff; blkoff < aend; blkoff += f.blksize { blkoff := blkoff blk := blkoff / f.blksize wg.Go(func(ctx context.Context) error { δ := blkoff-aoff // blk position in dest //log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize) return f.readBlk(ctx, blk, dest[δ:δ+f.blksize]) }) } err := wg.Wait() if err != nil { return nil, err2LogStatus(err) } return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK } // readBlk serves Read to read 1 ZBlk #blk into destination buffer. // // see "7) when we receive a FUSE read(#blk) request ..." in overview. // // len(dest) == blksize. // called with head.zheadMu rlocked. func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) { defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk) // check if someone else is already loading this block f.loadMu.Lock() loading, already := f.loading[blk] if !already { loading = &blkLoadState{ ready: make(chan struct{}), } f.loading[blk] = loading } f.loadMu.Unlock() // if it is already loading - just wait for it if already { select { case <-ctx.Done(): return ctx.Err() case <-loading.ready: if loading.err == nil { copy(dest, loading.blkdata) // TODO avoid copy } return loading.err } } // noone was loading - we became responsible to load this block blkdata, treepath, blkcov, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk) // head/ - update δFtail + pin watchers if f.head.rev == 0 && err == nil { // update δFtail index // see "3) for */head/data the following invariant is maintained..." δFtail := f.head.bfdir.δFtail δFtail.Track(f.zfile, blk, treepath, blkcov, zblk) // we have the data - it can be used after watchers are updated // XXX should we use ctx here? (see readPinWatchers comments) err = f.readPinWatchers(ctx, blk, blkrevMax) if err != nil { blkdata = nil } } loading.blkdata = blkdata loading.err = err // data loaded with error - cleanup .loading if loading.err != nil { close(loading.ready) f.loadMu.Lock() delete(f.loading, blk) f.loadMu.Unlock() return err } // data can be used now close(loading.ready) copy(dest, blkdata) // TODO avoid copy // store to kernel pagecache whole block that we've just loaded from database. // This way, even if the user currently requested to read only small portion from it, // it will prevent next e.g. consecutive user read request to again hit // the DB, and instead will be served by kernel from its pagecache. // // We cannot do this directly from reading goroutine - while reading // kernel FUSE is holding corresponding page in pagecache locked, and if // we would try to update that same page in pagecache it would result // in deadlock inside kernel. // // .loading cleanup is done once we are finished with putting the data into OS pagecache. // If we do it earlier - a simultaneous read covered by the same block could result // into missing both kernel pagecache (if not yet updated) and empty .loading[blk], // and thus would trigger DB access again. // // TODO if direct-io: don't touch pagecache // TODO upload parts only not covered by current read (not to e.g. wait for page lock) // TODO skip upload completely if read is wide to cover whole blksize go f.uploadBlk(blk, loading) return nil } // uploadBlk complements readBlk: it uploads loaded blkdata into OS cache. func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) { head := f.head // rlock zheadMu and make sure zwatcher is not asking us to pause. // if it does - wait for a safer time not to deadlock. // see notes.txt -> "Kernel locks page on read/cache store/..." for details. retry: for { head.zheadMu.RLock() // help zwatcher if it asks us to pause uploadings, so it can // take zheadMu wlocked without deadlocks. if head.pauseOSCacheUpload { ready := head.continueOSCacheUpload head.zheadMu.RUnlock() <-ready continue retry } break } // zheadMu rlocked. // zwatcher is not currently trying to pause OS cache uploads. // check if this block was already invalidated by zwatcher. // if so don't upload the block into OS cache. f.loadMu.Lock() loading_ := f.loading[blk] f.loadMu.Unlock() if loading != loading_ { head.zheadMu.RUnlock() return } oid := f.zfile.POid() // signal to zwatcher not to run while we are performing the upload. // upload with released zheadMu so that zwatcher can lock it even if to // check inflightOSCacheUploads status. atomic.AddInt32(&head.inflightOSCacheUploads, +1) head.zheadMu.RUnlock() st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata) f.loadMu.Lock() bug := (loading != f.loading[blk]) if !bug { delete(f.loading, blk) } f.loadMu.Unlock() // signal to zwatcher that we are done and it can continue. atomic.AddInt32(&head.inflightOSCacheUploads, -1) if bug { panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk) } if st == fuse.OK { return } // pagecache update failed, but it must not (we verified on startup that // pagecache control is supported by kernel). We can correctly live on // with the error, but data access will be likely very slow. Tell user // about the problem. log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)", oid, blk, st) } // -------- isolation protocol notification/serving -------- // // (see "7.2) for all registered client@at watchers ...") const _traceIso = false func traceIso(format string, argv ...interface{}) { if !_traceIso { return } log.InfoDepth(1, fmt.Sprintf(format, argv...)) } // pin makes sure that file[blk] on client side is the same as of @rev state. // // rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must // be no rev_next changing file[blk]: rev < rev_next ≤ w.at. // // must be called with atMu rlocked. // // TODO close watch on any error func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid()) return w._pin(ctx, blk, rev) } func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { foid := w.file.zfile.POid() revstr := rev.String() if rev == zodb.TidMax { revstr = "head" } defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr) if !(rev == zodb.TidMax || rev <= w.at) { panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev", foid, w.link.id, blk, rev, w.at) } w.pinnedMu.Lock() // check/wait for previous/simultaneous pin. // (pin could be called simultaneously e.g. by setupWatch and readPinWatchers) for { blkpin := w.pinned[blk] if blkpin == nil { break } w.pinnedMu.Unlock() <-blkpin.ready // TODO +ctx cancel if blkpin.rev == rev { // already pinned // (e.g. os cache for block was evicted and read called the second time) return blkpin.err } // relock the watch and check that w.pinned[blk] is the same. Retry if it is not. // ( w.pinned[blk] could have changed while w.mu was not held e.g. by // simultaneous setupWatch if we were called by readPinWatchers ) w.pinnedMu.Lock() if blkpin == w.pinned[blk] { if blkpin.rev == zodb.TidMax { w.pinnedMu.Unlock() panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk) } break } } // w.pinnedMu locked & previous pin is either nil or completed and its .rev != rev // -> setup new pin state blkpin := &blkPinState{rev: rev, ready: make(chan struct{})} w.pinned[blk] = blkpin // perform IO without w.pinnedMu w.pinnedMu.Unlock() ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr)) w.pinnedMu.Lock() // check IO reply & verify/signal blkpin is ready defer func() { if rev == zodb.TidMax { delete(w.pinned, blk) } w.pinnedMu.Unlock() close(blkpin.ready) }() if err != nil { blkpin.err = err return err } if ack != "ack" { blkpin.err = fmt.Errorf("expect %q; got %q", "ack", ack) return blkpin.err } if blkpin != w.pinned[blk] { blkpin.err = fmt.Errorf("BUG: pinned[#%d] mutated while doing IO", blk) panicf("f<%s>: wlink%d: %s", foid, w.link.id, blkpin.err) } return nil } // readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file // after a block was loaded from ZODB but before block data is returned to kernel. // // See "7.2) for all registered client@at watchers ..." // // Must be called only for f under head/ // Must be called with f.head.zheadMu rlocked. // // XXX do we really need to use/propagate caller context here? ideally update // watchers should be synchronous, and in practice we just use 30s timeout (TODO). // Should a READ interrupt cause watch update failure? -> probably no func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) { defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk // only head/ is being watched for if f.head.rev != 0 { panic("BUG: readPinWatchers: called for file under @revX/") } //fmt.Printf("S: read #%d -> pin watchers (#%d)\n", blk, len(f.watchTab)) // make sure that file[blk] on clients side stays as of @w.at state. // try to use blkrevMax only as the first cheap criteria to skip updating watchers. // This is likely to be the case, since most watchers should be usually close to head. // If using blkrevMax only turns out to be not sufficient, we'll // consult δFtail, which might involve recomputing it. δFtail := f.head.bfdir.δFtail blkrev := blkrevMax blkrevRough := true wg := xsync.NewWorkGroup(ctx) f.watchMu.RLock() for w := range f.watchTab { w := w // make sure w.at stays unchanged while we prepare and pin the block w.atMu.RLock() // the block is already covered by @w.at database view if blkrev <= w.at { w.atMu.RUnlock() continue } // if blkrev is rough estimation and that upper bound is > w.at // we have to recompute ~exact file[blk] revision @head. if blkrevRough { // unlock atMu while we are (re-)calculating blkrev // we'll relock atMu again and recheck blkrev vs w.at after. w.atMu.RUnlock() var err error blkrev, _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At()) if err != nil { return err } blkrevRough = false w.atMu.RLock() if blkrev <= w.at { w.atMu.RUnlock() continue } } // the block is newer - find out its revision as of @w.at and pin to that. // // We don't pin to w.at since if we would do so for several clients, // and most of them would be on different w.at - cache of the file will // be lost. Via pinning to particular block revision, we make sure the // revision to pin is the same on all clients, and so file cache is shared. wg.Go(func(ctx context.Context) error { defer w.atMu.RUnlock() pinrev, _, err := δFtail.BlkRevAt(ctx, f.zfile, blk, w.at) if err != nil { return err } //fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev) // TODO close watcher on any error return w.pin(ctx, blk, pinrev) }) } f.watchMu.RUnlock() return wg.Wait() } // setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request. // // It sends "pin" notifications; final "ok" or "error" must be sent by caller. func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) { defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at) head := wlink.head bfdir := head.bfdir // wait for zhead.At ≥ at if at != zodb.InvalidTid { err = head.zheadWait(ctx, at) if err != nil { return err } } // make sure zhead.At stays unchanged while we are preparing the watch // (see vvv e.g. about unpin to @head for why it is needed) head.zheadMu.RLock() defer head.zheadMu.RUnlock() headAt := head.zconn.At() if at != zodb.InvalidTid && at < bfdir.δFtail.Tail() { return fmt.Errorf("too far away back from head/at (@%s); δt = %s", headAt, headAt.Time().Sub(at.Time().Time)) } wlink.byfileMu.Lock() // if watch was already established - we need to update it w := wlink.byfile[foid] if w == nil { // watch was not previously established - set it up anew bfdir.fileMu.Lock() f := bfdir.fileTab[foid] bfdir.fileMu.Unlock() if f == nil { wlink.byfileMu.Unlock() // by "isolation protocol" watch is setup after data file was opened return fmt.Errorf("file not yet known to wcfs or is not a ZBigFile") } w = &Watch{ link: wlink, file: f, at: at, pinned: make(map[int64]*blkPinState), } } f := w.file f.watchMu.Lock() // at="-" (InvalidTid) means "remove the watch" if at == zodb.InvalidTid { delete(wlink.byfile, foid) delete(f.watchTab, w) f.watchMu.Unlock() wlink.byfileMu.Unlock() return nil } // request exclusive access to the watch to change .at and compute pins. // The lock will be downgraded from W to R after pins computation is done. // Pins will be executed with atMu.R only - with the idea not to block // other clients that read-access the file simultaneously to setupWatch. w.atMu.Lock() // check at >= w.at // TODO(?) we might want to allow going back in history if we need it. if !(at >= w.at) { w.atMu.Unlock() f.watchMu.Unlock() wlink.byfileMu.Unlock() return fmt.Errorf("going back in history is forbidden") } // register w to f early, so that READs going in parallel to us // preparing and processing initial pins, also send pins to w for read // blocks. If we don't, we can miss to send pin to w for a freshly read // block which could have revision > w.at: // // 1 3 2 4 // ─────.────x───o────x───x──────]────────── // ↑ ↑ // w.at head // // Here blocks #1, #2 and #4 were previously accessed, are thus tracked // by δFtail and are changed after w.at - they will be returned by vvv // δFtail query and pin-sent to w. Block #3 was not yet accessed but // was also changed after w.at . As head/file[#3] might be accessed // simultaneously to watch setup, and f.readBlk will be checking // f.watchTab; if w ∉ f.watchTab at that moment, w will miss to receive // pin for #3. // // NOTE for `unpin blk` to -> @head we can be sure there won't be // simultaneous `pin blk` request, because: // // - unpin means blk was previously pinned, // - blk was pinned means it is tracked by δFtail, // - if blk is tracked and δFtail says there is no δblk ∈ (at, head], // there is indeed no blk change in that region, // - which means that δblk with rev > w.at might be only > head, // - but such δblk are processed with zhead wlocked and we keep zhead // rlocked during pin setup. // // δ δ // ────x────.────────────]────x──── // ↑ ↑ // w.at head // // - also: there won't be simultaneous READs that would need to be // unpinned, because we update w.at to requested at early. w.at = at f.watchTab[w] = struct{}{} wlink.byfile[foid] = w f.watchMu.Unlock() wlink.byfileMu.Unlock() // TODO defer -> unregister watch if error // pin all tracked file blocks that were changed in (at, head] range. toPin := map[int64]zodb.Tid{} // blk -> @rev δFtail := bfdir.δFtail vδf, err := δFtail.SliceByFileRevEx(f.zfile, at, headAt, zdata.QueryOptions{ // blk might be in δFtail because it is adjacent in // ZBigFile.blktab to another blk that was explicitly tracked. // We do not want to get those to avoid unnecessarily pinning // potentially more blocks than needed. // // wcfs tests also verify that only blocks that were previously // explicitly accessed are included into watch setup pins. OnlyExplicitlyTracked: true, }) if err != nil { return err } for _, δfile := range vδf { if δfile.Epoch { // file epochs are currently forbidden (see watcher), so the only // case when we could see an epoch here is creation of // the file if w.at is before that time: // // create file // ────.────────x────────]──── // ↑ ↑ // w.at head // // but then the file should not be normally accessed in that case. // // -> reject such watches with an error return fmt.Errorf("file epoch detected @%s in between (at,head=@%s]", δfile.Rev, headAt) } for blk := range δfile.Blocks { _, already := toPin[blk] if already { continue } toPin[blk], _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, at) if err != nil { return err } } } // if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head. for blk, pinPrev := range w.pinned { // only 1 setupWatch can be run simultaneously for one file pinNew, pinning := toPin[blk] if !pinning { toPin[blk] = zodb.TidMax // @head } // TODO don't bother to spawn .pin goroutines if pin revision is the same ? // if pinNew == pinPrev.rev && ready(pinPrev.ready) && pinPrev.err == nil { // delete(toPin, blk) // } _ = pinPrev _ = pinNew } // downgrade atMu.W -> atMu.R to let other clients to access the file. // NOTE there is no primitive to do Wlock->Rlock atomically, but we are // ok with that since we prepared everything to handle simultaneous pins // from other reads. w.atMu.Unlock() w.atMu.RLock() defer w.atMu.RUnlock() wg := xsync.NewWorkGroup(ctx) for blk, rev := range toPin { blk := blk rev := rev wg.Go(func(ctx context.Context) error { return w._pin(ctx, blk, rev) }) } err = wg.Wait() if err != nil { return err } return nil } // Open serves /head/watch opens. func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) { // TODO(?) check flags head := wnode.head wlink := &WatchLink{ sk: NewFileSock(), id: atomic.AddInt32(&wnode.idNext, +1), head: head, byfile: make(map[zodb.Oid]*Watch), rxTab: make(map[uint64]chan string), } head.wlinkMu.Lock() // XXX del wlinkTab[w] on w.sk.File.Release head.wlinkTab[wlink] = struct{}{} head.wlinkMu.Unlock() go wlink.serve() return wlink.sk.File(), fuse.OK } // serve serves client initiated watch requests and routes client replies to // wcfs initiated pin requests. func (wlink *WatchLink) serve() { err := wlink._serve() if err != nil { log.Error(err) } head := wlink.head head.wlinkMu.Lock() delete(head.wlinkTab, wlink) head.wlinkMu.Unlock() } func (wlink *WatchLink) _serve() (err error) { defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id) ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout) ctx, cancel := context.WithCancel(ctx0) wg := xsync.NewWorkGroup(ctx) r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx)) defer func() { // cancel all handlers on both error and ok return. // ( ok return is e.g. when we received "bye", so if client // sends "bye" and some pin handlers are in progress - they // anyway don't need to wait for client replies anymore ) cancel() err2 := wg.Wait() if err == nil { err = err2 } // unregister all watches created on this wlink wlink.byfileMu.Lock() for _, w := range wlink.byfile { w.file.watchMu.Lock() delete(w.file.watchTab, w) w.file.watchMu.Unlock() } wlink.byfile = nil wlink.byfileMu.Unlock() // write to peer if it was logical error on client side if err != nil { _ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err)) } // close .sk.tx : this wakes up rx on client side. err2 = wlink.sk.CloseWrite() if err == nil { err = err2 } }() // close .sk.rx on error/wcfs stopping or return: this wakes up read(sk). retq := make(chan struct{}) defer close(retq) wg.Go(func(ctx context.Context) error { // monitor is always canceled - either at parent ctx cancel, or // upon return from serve (see "cancel all handlers ..." ^^^). // If it was return - report returned error to wg.Wait, not "canceled". <-ctx.Done() e := ctx.Err() select { default: case <-retq: e = err // returned error } e2 := wlink.sk.CloseRead() if e == nil { e = e2 } return e }) for { l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS if err != nil { // r.Read is woken up by sk.CloseRead when serve decides to exit if err == io.ErrClosedPipe || err == io.EOF { err = nil } return err } traceIso("S: wlink%d: rx: %q\n", wlink.id, l) stream, msg, err := parseWatchFrame(l) if err != nil { return err } // reply from client to wcfs reply := (stream % 2 == 0) if reply { wlink.rxMu.Lock() rxq := wlink.rxTab[stream] delete(wlink.rxTab, stream) wlink.rxMu.Unlock() if rxq == nil { return fmt.Errorf("%d: reply on unexpected stream", stream) } rxq <- msg continue } // client-initiated request // bye if msg == "bye" { return nil // deferred sk.Close will wake-up rx on client side } // watch ... wg.Go(func(ctx context.Context) error { return wlink.handleWatch(ctx, stream, msg) }) } } // handleWatch handles watch request from client. // // returned error comes without full error prefix. func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) { defer xerr.Contextf(&err, "%d", stream) err = wlink._handleWatch(ctx, msg) reply := "ok" if err != nil { // logical error is reported back to client, but watch link remains live reply = fmt.Sprintf("error %s", err) err = nil } err = wlink.send(ctx, stream, reply) return err } func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error { foid, at, err := parseWatch(msg) if err != nil { return err } err = wlink.setupWatch(ctx, foid, at) return err } // sendReq sends wcfs-originated request to client and returns client response. func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) { defer xerr.Context(&err, "sendReq") // wlink is already put into ctx by caller var stream uint64 for stream == 0 { stream = atomic.AddUint64(&wlink.reqNext, +2) } rxq := make(chan string, 1) wlink.rxMu.Lock() _, already := wlink.rxTab[stream] if !already { wlink.rxTab[stream] = rxq } wlink.rxMu.Unlock() if already { panic("BUG: to-be-sent stream is present in rxtab") } defer func() { if err != nil { // remove rxq from rxTab // ( _serve could have already deleted it if unexpected // reply came to the stream, but no other rxq should // have registered on the [stream] slot ) wlink.rxMu.Lock() delete(wlink.rxTab, stream) wlink.rxMu.Unlock() // no need to drain rxq - it was created with cap=1 } }() err = wlink.send(ctx, stream, req) if err != nil { return "", err } select { case <-ctx.Done(): return "", ctx.Err() case reply = <-rxq: return reply, nil } } // send sends a message to client over specified stream ID. // // Multiple send can be called simultaneously; send serializes writes. func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) (err error) { defer xerr.Contextf(&err, "send .%d", stream) // wlink is already put into ctx by caller // assert '\n' not in msg if strings.ContainsRune(msg, '\n') { panicf("BUG: msg contains \\n ; msg: %q", msg) } wlink.txMu.Lock() defer wlink.txMu.Unlock() pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg)) traceIso("S: wlink%d: tx: %q\n", wlink.id, pkt) _, err = wlink.sk.Write(ctx, pkt) if err != nil { return err } return nil } // ---- Lookup ---- // /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>. func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) { f, err := bfdir.lookup(out, name, fctx) var inode *nodefs.Inode if f != nil { inode = f.Inode() } return inode, err2LogStatus(err) } func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) { defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name) oid, err := zodb.ParseOid(name) if err != nil { return nil, eINVALf("not oid") } bfdir.head.zheadMu.RLock() // TODO +fctx -> cancel defer bfdir.head.zheadMu.RUnlock() defer func() { if f != nil { f.getattr(out) } }() // check to see if dir(oid) is already there bfdir.fileMu.Lock() f, already := bfdir.fileTab[oid] bfdir.fileMu.Unlock() if already { return f, nil } // not there - without bfdir lock proceed to open BigFile from ZODB f, err = bfdir.head.bigfopen(fctx, oid) if err != nil { return nil, err } // relock bfdir and either register f or, if the file was maybe // simultaneously created while we were not holding bfdir.fileMu, return that. bfdir.fileMu.Lock() f2, already := bfdir.fileTab[oid] if already { bfdir.fileMu.Unlock() // f.Close() not needed - BigFile is all just garbage-collected return f2, nil } bfdir.fileTab[oid] = f bfdir.fileMu.Unlock() // mkfile takes filesystem treeLock - do it outside bfdir.fileMu mkfile(bfdir, name, f) return f, nil } // / -> Lookup receives client request to create @<rev>/. func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) { revd, err := root.lookup(name, fctx) var inode *nodefs.Inode if revd != nil { inode = revd.Inode() _ = revd.GetAttr(out, nil, fctx) // always ok } return inode, err2LogStatus(err) } func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) { defer xerr.Contextf(&err, "/: lookup %q", name) var rev zodb.Tid ok := false if strings.HasPrefix(name, "@") { rev, err = zodb.ParseTid(name[1:]) ok = (err == nil) } if !ok { return nil, eINVALf("not @rev") } // check to see if dir(rev) is already there root.revMu.Lock() revDir, already := root.revTab[rev] root.revMu.Unlock() if already { // XXX race wrt simultaneous "FORGET @<rev>" ? return revDir, nil } // not there - without revMu lock proceed to open @rev view of ZODB zconnRev, err := xzodb.ZOpen(fctx, root.zdb, &zodb.ConnOptions{At: rev}) if err != nil { return nil, err } // relock root and either register new revX/ directory or, if the // directory was maybe simultaneously created while we were not holding // revMu, return that. root.revMu.Lock() revDir, already = root.revTab[rev] if already { root.revMu.Unlock() // zconnRev.Release() transaction.Current(zconnRev.TxnCtx).Abort() return revDir, nil } revDir = &Head{ // TODO how to test forgets: // echo 2 >/proc/sys/vm/drop_caches (root) // mount -i -oremount $mntpt (root ?) (shrinks dcache) // notify invalidate dentry from inside fs fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + Head.OnForget() -> del root.revTab[] rev: rev, zconn: zconnRev, // TODO + Head.OnForget() -> release zconn (= abort zconn.TxnCtx) } bfdir := &BigFileDir{ fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFileDir.OnForget() head: revDir, fileTab: make(map[zodb.Oid]*BigFile), δFtail: nil, // δFtail not needed/used for @revX/ } revDir.bfdir = bfdir root.revTab[rev] = revDir root.revMu.Unlock() // mkdir takes filesystem treeLock - do it outside revMu. mkdir(root, name, revDir) mkdir(revDir, "bigfile", bfdir) return revDir, nil } // bigfopen opens BigFile corresponding to oid on head.zconn. // // A ZBigFile corresponding to oid is activated and statted. // // head.zheadMu must be locked. func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) { zconn := head.zconn defer xerr.Contextf(&err, "bigfopen %s @%s", oid, zconn.At()) // TODO better ctx = transaction.PutIntoContext(ctx, txn) ctx, cancel := xcontext.Merge(ctx, zconn.TxnCtx) defer cancel() xzfile, err := zconn.Get(ctx, oid) if err != nil { switch errors.Cause(err).(type) { case *zodb.NoObjectError: return nil, eINVAL(err) case *zodb.NoDataError: return nil, eINVAL(err) default: return nil, err } } zfile, ok := xzfile.(*ZBigFile) if !ok { return nil, eINVALf("%s is not a ZBigFile", xzodb.TypeOf(xzfile)) } // extract blksize, size and initial approximation for file revision err = zfile.PActivate(ctx) if err != nil { return nil, err } blksize := zfile.BlkSize() // NOTE file revision should be revision of both ZBigFile and its data. But we // cannot get data revision without expensive scan of all ZBigFile's objects. // -> approximate mtime initially with ZBigFile object mtime. revApprox := zfile.PSerial() zfile.PDeactivate() size, sizePath, blkCov, err := zfile.Size(ctx) if err != nil { return nil, err } f := &BigFile{ fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFile.OnForget -> del .head.bfdir.fileTab[] head: head, zfile: zfile, blksize: blksize, size: size, revApprox: revApprox, loading: make(map[int64]*blkLoadState), } // only head/ needs δFtail and watches. if head.rev == 0 { // see "3) for */head/data the following invariant is maintained..." head.bfdir.δFtail.Track(f.zfile, -1, sizePath, blkCov, nil) f.watchTab = make(map[*Watch]struct{}) } return f, nil } // ---- misc --- // /(head|<rev>)/at -> readAt serves read. func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) { h.zheadMu.RLock() // TODO +fctx -> cancel defer h.zheadMu.RUnlock() return []byte(h.zconn.At().String()), nil } // /(head|<rev>)/ -> Getattr serves stat. func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status { at := head.rev if at == 0 { head.zheadMu.RLock() // TODO +fctx -> cancel at = head.zconn.At() head.zheadMu.RUnlock() } t := at.Time().Time out.Mode = fuse.S_IFDIR | 0555 out.SetTimes(/*atime=*/nil, /*mtime=*/&t, /*ctime=*/&t) return fuse.OK } // /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat. func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status { f.head.zheadMu.RLock() // TODO +fctx -> cancel defer f.head.zheadMu.RUnlock() f.getattr(out) return fuse.OK } func (f *BigFile) getattr(out *fuse.Attr) { out.Mode = fuse.S_IFREG | 0444 out.Size = uint64(f.size) out.Blksize = uint32(f.blksize) // NOTE truncating 64 -> 32 // .Blocks mtime := f.revApprox.Time().Time out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime) } // FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode // TODO: // - Inode += .Mount() -> nodefs.Mount // - Mount: // .Root() -> root Inode of the fs // .Connector() -> FileSystemConnector through which fs is mounted var groot *Root var gfsconn *nodefs.FileSystemConnector // root of the filesystem is mounted here. // // we need to talk to kernel and lookup @<rev>/bigfile/<fid> before uploading // data to kernel cache there. Referencing root of the filesystem via path is // vulnerable to bugs wrt e.g. `mount --move` and/or mounting something else // over wcfs. However keeping opened root fd will prevent wcfs to be unmounted, // so we still have to reference the root via path. var gmntpt string // debugging (protected by zhead.W) var gdebug = struct { // .wcfs/zhead opens // protected by groot.head.zheadMu zheadSockTab map[*FileSock]struct{} }{} func init() { gdebug.zheadSockTab = make(map[*FileSock]struct{}) } // _wcfs_Zhead serves .wcfs/zhead opens. type _wcfs_Zhead struct { fsNode } func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) { // TODO(?) check flags sk := NewFileSock() sk.CloseRead() groot.head.zheadMu.Lock() // TODO +fctx -> cancel defer groot.head.zheadMu.Unlock() // TODO del zheadSockTab[sk] on sk.File.Release (= client drops opened handle) gdebug.zheadSockTab[sk] = struct{}{} return sk.File(), fuse.OK } // TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?) func main() { //stdlog.SetPrefix("wcfs: ") NOTE conflicts with log.CopyStandardLogTo log.CopyStandardLogTo("WARNING") defer log.Flush() err := _main() if err != nil { log.Fatal(err) } } func _main() (err error) { debug := flag.Bool("d", false, "debug") autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity") flag.Parse() if len(flag.Args()) != 2 { fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] zurl mntpt\n", os.Args[0]) os.Exit(2) } zurl := flag.Args()[0] mntpt := flag.Args()[1] xclose := func(c io.Closer) { err = xerr.First(err, c.Close()) } // debug -> precise t, no dates (TODO(?) -> always precise t?) if *debug { stdlog.SetFlags(stdlog.Lmicroseconds) } log.Infof("start %q %q", mntpt, zurl) gover := "(built with " + runtime.Version() if race.Enabled { gover += " -race" } gover += ")" log.Info(gover) // open zodb storage/watch/db/connection ctx := context.Background() // TODO(?) + timeout? zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{ ReadOnly: true, }) if err != nil { return err } defer xclose(zstor) zwatchq := make(chan zodb.Event) at0 := zstor.AddWatch(zwatchq) defer zstor.DelWatch(zwatchq) // TODO consider using zodbCacheControl for all connections // ( in addition to zhead, historic connections - that are used to access @rev - // also need to traverse BigFile.blktab btree ) zdb := zodb.NewDB(zstor, &zodb.DBOptions{}) defer xclose(zdb) zhead, err := xzodb.ZOpen(ctx, zdb, &zodb.ConnOptions{ At: at0, // preserve zhead.cache across several transactions. // see "ZODB cache control" NoPool: true, }) if err != nil { return err } zhead.Cache().Lock() zhead.Cache().SetControl(&zodbCacheControl{}) zhead.Cache().Unlock() // mount root + head/ head := &Head{ fsNode: newFSNode(fSticky), rev: 0, zconn: zhead, wlinkTab: make(map[*WatchLink]struct{}), hwait: make(map[hwaiter]struct{}), } wnode := &WatchNode{ fsNode: newFSNode(fSticky), head: head, } bfdir := &BigFileDir{ fsNode: newFSNode(fSticky), head: head, fileTab: make(map[zodb.Oid]*BigFile), δFtail: zdata.NewΔFtail(zhead.At(), zdb), } head.bfdir = bfdir root := &Root{ fsNode: newFSNode(fSticky), zstor: zstor, zdb: zdb, head: head, revTab: make(map[zodb.Tid]*Head), } opts := &fuse.MountOptions{ FsName: zurl, Name: "wcfs", // We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size. // XXX currently go-fuse caps MaxWrite to 128KB. // TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+). MaxWrite: 2*1024*1024, // TODO(?) tune MaxReadAhead? MaxBackground? // OS cache that we populate with bigfile data is precious; // we explicitly propagate ZODB invalidations into file invalidations. ExplicitDataCacheControl: true, DisableXAttrs: true, // we don't use Debug: *debug, } fssrv, fsconn, err := mount(mntpt, root, opts) if err != nil { return err } groot = root // FIXME temp workaround (see ^^^) gfsconn = fsconn // FIXME ----//---- gmntpt = mntpt // we require proper pagecache control (added to Linux 2.6.36 in 2010) kinit := fssrv.KernelSettings() kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor) supports := kinit.SupportsNotify if !(supports(fuse.NOTIFY_STORE_CACHE) && supports(fuse.NOTIFY_RETRIEVE_CACHE)) { return fmt.Errorf("%s does not support pagecache control", kfuse) } // make a bold warning if kernel does not support explicit cache invalidation // (patch is in Linux 5.2+; see notes.txt -> "Notes on OS pagecache control") if kinit.Flags & fuse.CAP_EXPLICIT_INVAL_DATA == 0 { w1 := fmt.Sprintf("%s does not support explicit data cache invalidation", kfuse) w2 := "-> performance will be AWFUL." w3 := "-> you need kernel which includes git.kernel.org/linus/ad2ba64dd489." w4 := "-> (Linux 5.2+, or nxd-fuse-dkms package installed from navytux.spb.ru/pkg)" log.Error(w1); log.Error(w2); log.Error(w3); log.Error(w4) fmt.Fprintf(os.Stderr, "W: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\n", w1, w2, w3, w4) } // add entries to / mkdir(root, "head", head) mkdir(head, "bigfile", bfdir) mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at) mkfile(head, "watch", wnode) // for debugging/testing _wcfs := newFSNode(fSticky) mkdir(root, ".wcfs", &_wcfs) mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl))) // .wcfs/zhead - special file channel that sends zhead.at. // // If a user opens it, it will start to get tids of through which // zhead.at was, starting from the time when .wcfs/zhead was opened. // There can be multiple openers. Once opened, the file must be read, // as wcfs blocks waiting for data to be read when processing // invalidations. mkfile(&_wcfs, "zhead", &_wcfs_Zhead{ fsNode: newFSNode(fSticky), }) // TODO handle autoexit // (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl // opened, so when all inodes has been forgotten - we know all wcfs.py clients exited) _ = autoexit defer xerr.Contextf(&err, "serve %s %s", mntpt, zurl) // spawn filesystem server. // // use `go serve` + `waitMount` not just `serve` - because waitMount // cares to disable OS calling poll on us. // ( if we don't disable polling - fs serving can get stuck - see // https://github.com/hanwen/go-fuse/commit/4f10e248eb for details ) serveCtx, serveCancel := context.WithCancel(context.Background()) go func () { defer serveCancel() fssrv.Serve() }() err = fssrv.WaitMount() if err != nil { return err } // filesystem server is serving requests. // run zwatcher and wait for it to complete. // zwatcher completes either normally - due to filesystem unmount, or fails. // if zwatcher fails - switch filesystem to return EIO instead of stale data. err = root.zwatcher(serveCtx, zwatchq) if errors.Cause(err) != context.Canceled { log.Error(err) log.Errorf("zwatcher failed -> switching filesystem to EIO mode (TODO)") // TODO: switch fs to EIO mode } // wait for unmount // NOTE the kernel does not send FORGETs on unmount - but we don't need // to release left node resources ourselves, because it is just memory. <-serveCtx.Done() log.Infof("stop %q %q", mntpt, zurl) return nil }