wcfs.go 54.6 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
Kirill Smelkov committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20
// Program wcfs provides filesystem server with file data backed by wendelin.core arrays.
Kirill Smelkov's avatar
Kirill Smelkov committed
21 22 23 24 25
//
// Intro
//
// Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile)
// and array metadata like dtype, shape and strides associated with it. This
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
// program exposes as files only ZBigFile data and leaves rest of
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27
// array-specific handling to clients. Every ZBigFile is exposed as one separate
Kirill Smelkov's avatar
Kirill Smelkov committed
28 29 30
// file that represents whole ZBigFile's data.
//
// For a client, the primary way to access a bigfile should be to mmap
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31
// head/bigfile/<bigfileX> which represents always latest bigfile data.
Kirill Smelkov's avatar
Kirill Smelkov committed
32 33 34 35 36 37 38 39 40 41 42 43 44
// Clients that want to get isolation guarantee should subscribe for
// invalidations and re-mmap invalidated regions to file with pinned bigfile revision for
// the duration of their transaction. See "Invalidation protocol" for details.
//
// In the usual situation when bigfiles are big, and there are O(1)/δt updates,
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
//
// Filesystem organization
//
// Top-level structure of provided filesystem is as follows:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45
//	head/			; latest database view
Kirill Smelkov's avatar
Kirill Smelkov committed
46
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47 48 49
//	@<rev1>/		; database view as of revision <revX>
//		...
//	@<rev2>/
Kirill Smelkov's avatar
Kirill Smelkov committed
50
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
//	...
Kirill Smelkov's avatar
Kirill Smelkov committed
52
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
// where head/ represents latest data as stored in upstream ZODB, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
// @<revX>/ represents data as of database revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
55 56 57
//
// head/ has the following structure:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58 59 60 61
//	head/
//		at			; data inside head/ is as of this ZODB transaction
//		watch			; channel for bigfile invalidations
//		bigfile/		; bigfiles' data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
65
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68 69 70 71
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB. As there can be some lag receiving updates from the database,
// /at describes precisely ZODB state for which bigfile data is currently
// exposed. Whenever bigfile data is changed in upstream ZODB, information
// about the changes is first propagated to /watch, and only after that
// /bigfile/<bigfileX> is updated. See "Invalidation protocol" for details.
Kirill Smelkov's avatar
Kirill Smelkov committed
72
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
73
// @<revX>/ has the following structure:
Kirill Smelkov's avatar
Kirill Smelkov committed
74
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75 76 77
//	@<revX>/
//		at
//		bigfile/		; bigfiles' data as of revision <revX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78 79
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
81
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
83
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84
// Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85
// wcfs filesystem. Similarly @<revX>/ become visible only after access.
Kirill Smelkov's avatar
Kirill Smelkov committed
86 87 88 89
//
//
// Invalidation protocol
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90
// In order to support isolation, wcfs implements invalidation protocol that
Kirill Smelkov's avatar
Kirill Smelkov committed
91 92
// must be cooperatively followed by both wcfs and client.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93
// First, client mmaps latest bigfile, but does not access it
Kirill Smelkov's avatar
Kirill Smelkov committed
94
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
//	mmap(head/bigfile/<bigfileX>)
Kirill Smelkov's avatar
Kirill Smelkov committed
96
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97 98
// Then client opens head/watch and tells wcfs through it for which ZODB state
// it wants to get bigfile's view.
Kirill Smelkov's avatar
Kirill Smelkov committed
99
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
100
//	C: 1 watch <bigfileX> @<at>
Kirill Smelkov's avatar
Kirill Smelkov committed
101
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
102 103
// The server then, after potentially sending initial pin messages (see below),
// reports either success or failure:
Kirill Smelkov's avatar
Kirill Smelkov committed
104
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105 106
//	S: 1 ok
//	S: 1 error ...		; if <at> is too far away back from head/at
Kirill Smelkov's avatar
Kirill Smelkov committed
107
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108 109 110 111 112
// The server sends "ok" reply only after head/at is ≥ requested <at>, and
// only after all initial pin messages are fully acknowledged by the client.
// The client can start to use mmapped data after it gets "ok".
// The server sends "error" reply if requested <at> is too far away back from
// head/at.
Kirill Smelkov's avatar
Kirill Smelkov committed
113
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
114 115 116
// Upon watch request, either initially, or after sending "ok", the server will be notifying the
// client about file blocks that client needs to pin in order to observe file's
// data as of <at> revision:
Kirill Smelkov's avatar
Kirill Smelkov committed
117
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
118 119 120 121 122 123
// The filesystem server itself receives information about changed data from
// ZODB server through regular ZODB invalidation channel (as it is ZODB client
// itself). Then, separately for each changed file block, before actually
// updating head/bigfile/<bigfileX> content, it notifies through head/watch to
// clients, that had requested it (separately to each client), about the
// changes:
Kirill Smelkov's avatar
Kirill Smelkov committed
124
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
125
//	S: 2 pin <bigfileX> #<blk> @<rev_max>	XXX 2-> 2*k (multiple pins in parallel)
Kirill Smelkov's avatar
Kirill Smelkov committed
126
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
127 128
// and waits until all clients confirm that changed file block can be updated
// in global OS cache.
Kirill Smelkov's avatar
Kirill Smelkov committed
129
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
// The client in turn should now re-mmap requested to be pinned block to bigfile@<rev_max>
Kirill Smelkov's avatar
Kirill Smelkov committed
131
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133
//	# mmapped at address corresponding to #blk
//	mmap(@<rev_max>/bigfile/<bigfileX>, #blk, MAP_FIXED)
Kirill Smelkov's avatar
Kirill Smelkov committed
134 135 136
//
// and must send ack back to the server when it is done:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138 139 140
//	C: 2 ack
//
// The server sends pin notifications only for file blocks, that are known to
// be potentially changed after client's <at>, and <rev_max> describes the
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
141
// upper bound for the block revision as of <at> database view:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143
//	<rev_max> ≤ <at>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
144 145 146 147 148 149 150 151 152
//
// The server maintains short history tail of file changes to be able to
// support openings with <at> being slightly in the past compared to current
// head/at. The server might reject a watch request if <at> is too far away in
// the past from head/at. The client is advised to restart its transaction with
// more uptodate database view if it gets watch setup error.
//
// A later request from the client for the same <bigfileX> but with different
// <at>, overrides previous watch request for that file. A client can use "-"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
153
// instead of "@<at>" to stop watching a file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
154 155 156 157 158
//
// A single client can send several watch requests through single head/watch
// open, as well as it can use several head/watch opens simultaneously.
// The server sends pin notifications for all files requested to be watched via
// every head/watch open.
Kirill Smelkov's avatar
Kirill Smelkov committed
159
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
160 161 162 163 164 165
// Note: a client could use a single watch to manage its several views for the same
// file but with different <at>. This could be achieved via watching with
// @<at_min>, and then deciding internally which views needs to be adjusted and
// which views need not. Wcfs does not oblige clients to do so though, and a
// client is free to use as many head/watch openenings as it needs to.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
166
// When clients are done with @<revX>/bigfile/<bigfileX> (i.e. client's
Kirill Smelkov's avatar
Kirill Smelkov committed
167
// transaction ends and array is unmapped), the server sees number of opened
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
168 169
// files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically
// destroys @<revX>/bigfile/<bigfileX> after reasonable timeout.
Kirill Smelkov's avatar
Kirill Smelkov committed
170 171 172 173
//
//
// Protection against slow or faulty clients
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
174 175 176 177
// If a client, on purpose or due to a bug or being stopped, is slow to respond
// with ack to file invalidation notification, it creates a problem because the
// server will become blocked waiting for pin acknowledgments, and thus all
// other clients, that try to work with the same file, will get stuck.
Kirill Smelkov's avatar
Kirill Smelkov committed
178
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180
// The problem could be avoided, if wcfs would reside inside OS kernel and this
// way could be able to manipulate clients address space directly (then
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
181 182 183 184
// invalidation protocol won't be needed). It is also possible to imagine
// mechanism, where wcfs would synchronously change clients' address space via
// injecting trusted code and running it on client side via ptrace to adjust
// file mappings.
Kirill Smelkov's avatar
Kirill Smelkov committed
185
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186 187 188 189 190
// However ptrace does not work when client thread is blocked under pagefault,
// and that is exactly what wcfs would need to do to process invalidations
// lazily, because eager invalidation processing results in prohibitively slow
// file opens. See internal wcfs overview for details about why ptrace
// cannot be used and why lazy invalidation processing is required.
Kirill Smelkov's avatar
Kirill Smelkov committed
191
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
192 193 194
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
Kirill Smelkov's avatar
Kirill Smelkov committed
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
//
//
// Writes
//
// As each bigfile is represented by 1 synthetic file, there can be several
// write schemes:
//
// 1. mmap(MAP_PRIVATE) + writeout by client
//
// In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local
// user changes are not automatically propagated back to the file. When there
// is a need to commit, client investigates via some OS mechanism, e.g.
// /proc/self/pagemap or something similar, which pages of this mapping it
// modified. Knowing this it knows which data it dirtied and so can write this
// data back to ZODB itself, without filesystem server providing write support.
//
// 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client
//
// In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages
// protection. Then whenever write fault occurs, client allocates RAM from
// shmfs, copies faulted page to it, and then mmaps RAM page with RW protection
// in place of original bigfile page. Writeout implementation should be similar
// to "1", only here client already knows the pages it dirtied, and this way
// there is no need to consult /proc/self/pagemap.
//
// The advantage of this scheme over mmap(MAP_PRIVATE) is that in case
// there are several in-process mappings of the same bigfile with overlapping
// in-file ranges, changes in one mapping will be visible in another mapping.
// Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs
// faulted page into a page completely private to this mapping, so that other
// MAP_PRIVATE mappings of this file, including ones created from the same
// process, do not see changes made to the first mapping.
//
// Since wendelin.core needs to provide coherency in between different slices
// of the same array, this is the mode wendelin.core actually uses.
//
// 3. write to wcfs
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
233
// TODO we later could implement "write-directly" mode where clients would write
Kirill Smelkov's avatar
Kirill Smelkov committed
234
// data directly into the file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
235
package main
Kirill Smelkov's avatar
Kirill Smelkov committed
236

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237
// Wcfs organization
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
238
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
239
// Wcfs is a ZODB client that translates ZODB objects into OS files as would
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
240
// non-wcfs wendelin.core do for a ZBigFile. Contrary to non-wcfs wendelin.core,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
241
// it keeps bigfile data in shared OS cache efficiently. It is organized as follows:
242
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
244 245
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
246
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
247
//	#blk ∈ OS file cache    =>    ZBlk(#blk) + all BTree/Bucket that lead to it  ∈ zhead cache(%)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248
//	                              (ZBlk* in ghost state)
249
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
250
//    The invariant helps on invalidation: if we see a changed oid, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
251
//    zhead.cache.lookup(oid) = ø -> we know we don't have to invalidate OS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252 253
//    cache for any part of any file (even if oid relates to a file block - that
//    block is not cached and will trigger ZODB load on file read).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
254
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
255 256 257
//    Currently we maintain this invariant by simply never evicting ZBlk/LOBTree/LOBucket
//    objects from ZODB Connection cache. In the future we may want to try to
//    synchronize to kernel freeing its pagecache pages.
258
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259
// 4) when we receive an invalidation message from ZODB - we process it and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
260
//    propagate invalidations to OS file cache of head/bigfile/*:
261
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
262
//	invalidation message: (tid↑, []oid)
263
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
264
//    4.1) zhead.cache.lookup(oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
265 266
//    4.2) ø: nothing to do - see invariant ^^^.
//    4.3) obj found:
267 268
//
//	- ZBlk*		-> file/#blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
269
//	- BTree/Bucket	-> δ(BTree)  -> file/[]#blk
270
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271
//	in the end after processing all []oid from invalidation message we have
272 273 274
//
//	  [] of file/[]#blk
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
275
//	that describes which file(s) parts needs to be invalidated.
276
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
277
//    4.4) for all file/blk to invalidate we do:
278
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
279
//	- try to retrieve head/bigfile/file[blk] from OS file cache(*);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
280
//	- if retrieved successfully -> store retrieved data back into OS file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
281
//	  cache for @<rev>/bigfile/file[blk], where
Kirill Smelkov's avatar
Kirill Smelkov committed
282
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
283
//	    rev = max(δFtail.by(#blk)) || min(rev ∈ δFtail) || zhead.at	; see below about δFtail
Kirill Smelkov's avatar
Kirill Smelkov committed
284
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
285
//	- invalidate head/bigfile/file[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
286 287
//
//	This preserves previous data in OS file cache in case it will be needed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
288
//	by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
289 290 291
//	won't be served from OS file cache and instead will trigger a FUSE read
//	request to wcfs.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
292
//    4.5) no invalidation messages are sent to wcfs clients at this point(+).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
293
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
294 295
//    4.6) processing ZODB invalidations and serving file reads (see 7) are
//      organized to be mutually exclusive.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
296
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
297 298
//	(TODO head.zconnMu -> special mutex with Lock(ctx) so that Lock could be canceled)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299 300 301 302
// 5) after OS file cache was invalidated, we resync zhead to new database
//    view corresponding to tid.
//
// 6) for every file δFtail invalidation info about head/data is maintained:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
303
//
Kirill Smelkov's avatar
Kirill Smelkov committed
304 305
//	- tailv: [](rev↑, []#blk)
//	- by:    {} #blk -> []rev↑ in tail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
306
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
307 308
//    δFtail.tail describes invalidations to file we learned from ZODB invalidation.
//    δFtail.by   allows to quickly lookup information by #blk.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310
//    min(rev) in δFtail is min(@at) at which head/bigfile/file is currently mmapped (see below).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
311
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312 313 314
//    to support initial openings with @at being slightly in the past, we also
//    make sure that min(rev) is enough to cover last 10 minutes of history
//    from head/at.
315
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316 317 318
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file we process it as follows:
//
//   7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
319 320 321
//
//	while loading this also gives upper bound estimate of when the block
//	was last changed:
322
//
Kirill Smelkov's avatar
Kirill Smelkov committed
323 324 325
//	  rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk))
//
//	it is not exact because BTree/Bucket can change (e.g. rebalance)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
326
//	but still point to the same k->ZBlk.
Kirill Smelkov's avatar
Kirill Smelkov committed
327 328 329 330 331 332 333
//
//	we also use file.δFtail to find either exact blk revision:
//
//	  rev(blk) = max(file.δFtail.by(#blk) -> []rev↑)
//
//	or another upper bound if #blk ∉ δFtail:
//
Kirill Smelkov's avatar
Kirill Smelkov committed
334
//	  rev(blk) ≤ min(rev ∈ δFtail)		; #blk ∉ δFtail
Kirill Smelkov's avatar
Kirill Smelkov committed
335 336
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337
//	below rev'(blk) is min(of the estimates found):
Kirill Smelkov's avatar
Kirill Smelkov committed
338 339 340
//
//	  rev(blk) ≤ rev'(blk)		rev'(blk) = min(^^^)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
342
//   7.2) for all registered client@at watchers of head/bigfile/file:
343
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344
//	- rev'(blk) ≤ at: -> do nothing
Kirill Smelkov's avatar
Kirill Smelkov committed
345
//	- rev'(blk) > at:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
346
//	  - if blk ∈ watcher.pinned -> do nothing
Kirill Smelkov's avatar
Kirill Smelkov committed
347
//	  - rev = max(δFtail.by(#blk) : _ ≤ at)	|| min(rev ∈ δFtail : rev ≤ at)	|| at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
348 349 350 351 352 353 354 355 356 357 358 359 360
//	  - watcher.pin(file, #blk, @rev)
//	  - watcher.pinned += blk
//
//	where
//
//	  watcher.pin(file, #blk, @rev)
//
//	sends pin message according to "Invalidation protocol", and is assumed
//	to cause
//
//	  remmap(file, #blk, @rev/bigfile/file)
//
//	on client.
361
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
362 363
//	( one could imagine adjusting mappings synchronously via running
//	  wcfs-trusted code via ptrace that wcfs injects into clients, but ptrace
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
364
//	  won't work when client thread is blocked under pagefault or syscall(^) )
365
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
366
//	in order to support watching for each head/bigfile/file
367
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
368
//	  [] of watch{client@at↑, pinned}
369
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
370 371
//	is maintained.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
372
//   7.3) blkdata is returned to kernel.
373 374 375
//
//   Thus a client that wants latest data on pagefault will get latest data,
//   and a client that wants @rev data will get @rev data, even if it was this
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
376
//   "old" client that triggered the pagefault(~).
377
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
378
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
379
// XXX 9) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
380 381
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383 384 385
// (*) see notes.txt -> "Notes on OS pagecache control"
// (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access"
// (~) see notes.txt -> "Changing mmapping while under pagefault is possible"
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
387
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388
// XXX For every ZODB connection a dedicated read-only transaction is maintained.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
389

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
390
import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
391
	"bufio"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
	"context"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
393
	"flag"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
394
	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
395
	stdlog "log"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
396
	"os"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
397
	"runtime"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
398
	"strings"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
399
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400
	"sync/atomic"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
401
	"syscall"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
402

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
403
	log "github.com/golang/glog"
Kirill Smelkov's avatar
Kirill Smelkov committed
404 405
	"golang.org/x/sync/errgroup"

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406
	"lab.nexedi.com/kirr/go123/xcontext"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
407
	"lab.nexedi.com/kirr/go123/xerr"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
408

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409
	"lab.nexedi.com/kirr/neo/go/transaction"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410
	"lab.nexedi.com/kirr/neo/go/zodb"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
411
	"lab.nexedi.com/kirr/neo/go/zodb/btree"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
412
	_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
413 414 415

	"github.com/hanwen/go-fuse/fuse"
	"github.com/hanwen/go-fuse/fuse/nodefs"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
416
	"github.com/pkg/errors"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417 418

	"./internal/δbtree"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419 420
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
421 422
// Root represents root of wcfs filesystem.
type Root struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
424 425 426 427 428

	// ZODB storage we work with
	zstor zodb.IStorage

	// ZODB DB handle for zstor.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429
	// keeps cache of connections for @<rev>/ accesses.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
430
	// only one connection is used for each @<rev>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431 432
	zdb *zodb.DB

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433
	// directory + ZODB connection for head/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
434
	// (zhead is Resync'ed and is kept outside zdb pool)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
435
	head *Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437 438 439
	// directories + ZODB connections for @<rev>/
	revMu  sync.Mutex
	revTab map[zodb.Tid]*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
440 441
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
442
// /(head|<rev>)/			- served by Head.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
443
type Head struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
444
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
445

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
446
	rev   zodb.Tid    // 0 for head/, !0 for @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447 448 449
	bfdir *BigFileDir // bigfile/
	// at    - served by .readAt
	// watch - implicitly linked to by fs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
450 451

	// ZODB connection for everything under this head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452 453 454 455 456 457 458 459 460 461 462 463

	// protects access to zconn & live _objects_ associated with it.
	// while it is rlocked zconn is guaranteed to stay viewing database at
	// particular view.
	//
	// zwatcher write-locks this and knows noone is using ZODB objects and
	// noone mutates OS file cache while zwatcher is running.
	//
	// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
	// with additional locking protocol to avoid deadlocks (see below for
	// pauseOSCacheUpload + ...).
	zconnMu sync.RWMutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
464
	zconn   *ZConn       // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
465

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
466 467 468 469 470 471 472
	// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
	pauseOSCacheUpload    bool
	continueOSCacheUpload chan struct{}
	// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
	inflightOSCacheUploads int32

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
473
	// XXX move zconn's current transaction to Head here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
474

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
475
	// XXX move watchTab here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
476 477 478 479
	// head/watch opens
	// XXX protected by ... head.zconnMu ?
	// XXX -> Head ?
	watchTab map[*Watcher]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
480
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
481

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
482 483
// /head/watch				- served by Watch.
type Watch struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
485

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
486 487
	head   *Head // parent head/
	idNext int32 // ID for next opened Watcher
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
488 489 490 491
}

// /head/watch handle			- served by Watcher.
type Watcher struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
492 493 494 495
	sk   *FileSock
	id   int32     // ID of this /head/watch handle (for debug log)
	head *Head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
496

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
497 498 499 500
	// established file watchers.
	// XXX in-progress - where?
	// XXX locking?
	fileTab map[*FileWatch]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
501 502

	// IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
503
//	acceptq chan string // (stream, msg)   // client-initiated messages go here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
504
	rxMu    sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
505
	rxTab   map[uint64]chan string // client replies go via here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
506 507 508 509 510 511 512 513 514 515 516 517 518
}

// FileWatch represents watching for 1 BigFile.
type FileWatch struct {
	link *Watcher // link to client
	file *BigFile	// XXX needed?

	// XXX locking

	// requested to be watched @at
	at zodb.Tid

	// XXX pinned
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
519 520
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
521 522
// /(head|<rev>)/bigfile/		- served by BigFileDir.
type BigFileDir struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
523
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524
	head *Head // parent head/ or @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
525 526

	// {} oid -> <bigfileX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
527
	fileMu  sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
528
	fileTab map[zodb.Oid]*BigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
529

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530 531
	// visited BTree nodes of all BigFiles
	// -> which file + ordering for toposort on δbtree
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
532 533
	//
	// (used only for head/, not revX/)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534 535
	indexMu     sync.Mutex
	indexLooked *δbtree.PathSet	// XXX naming
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
536 537
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
538
// /(head|<rev>)/bigfile/<bigfileX>	- served by BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
539
type BigFile struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
540
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
541

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
542
	// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
543 544
	// parent's BigFileDir.head is the same.
	head	*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545

546
	// ZBigFile top-level object
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
547
	zfile	*ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
548

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
549 550 551 552
	// things read/computed from .zfile; constant during lifetime of current transaction.
	blksize int64    // zfile.blksize
	size    int64    // zfile.Size()
	rev     zodb.Tid // last revision that modified zfile data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
553

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
554 555
	// tail change history of this file.
	δFtail *ΔTailI64 // [](rev↑, []#blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
556

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
557
	// inflight loadings of ZBigFile from ZODB.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
558 559 560 561
	// successful load results are kept here until blkdata is put into OS pagecache.
	//
	// Being a staging area for data to enter OS cache, loading has to be
	// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
562
	loadMu  sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
563
	loading map[int64]*blkLoadState // #blk -> {... blkdata}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
564

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565
	// watchers attached to this file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567 568
	// XXX already in "established" state (i.e. initial watch request was answered with "ok")
	// XXX locking -> watchMu?
	watchers map[*FileWatch]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
569 570 571 572 573 574 575 576 577
}

// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
// when ready the loading has been completed.
type blkLoadState struct {
	ready chan struct{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
578 579
	blkdata  []byte
	err      error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
580 581
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
582
// -------- 3) Cache invariant --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
584 585
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
586
// because it is essentially the index where to find ZBigFile data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587 588 589 590 591 592 593
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// See "3) for */head/data the following invariant is maintained..."
type zodbCacheControl struct {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594
func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
595
	switch obj.(type) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596
	// ZBlk* should be in cache but without data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
597
	case *ZBlk0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
598
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
599
	case *ZBlk1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
600
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
601 602

	// ZBigFile btree index should be in cache with data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
603
	case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
604
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
605
	case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
606 607 608 609
		return zodb.PCachePinObject | zodb.PCacheKeepState

	// don't let ZData to pollute the cache
	case *ZData:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610
		return zodb.PCacheDropObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
611

612 613 614
	// for performance reason we also keep ZBigFile in cache.
	//
	// ZBigFile is top-level object that is used on every block load, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
615
	// it would be a waste to evict ZBigFile from cache.
616
	case *ZBigFile:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
618 619
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620
	return 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
621 622
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
623 624
// -------- 4) ZODB invalidation -> OS cache --------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
625
func traceZWatch(format string, argv ...interface{}) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
626 627 628 629
	if !log.V(1) {	// XXX -> 2?
		return
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630
	log.Infof("zwatcher: " + format, argv...)	// XXX InfoDepthf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
631 632
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
633
// zwatcher watches for ZODB changes.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
634
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
635
// see "4) when we receive an invalidation message from ZODB ..."
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
636
func (root *Root) zwatcher(ctx context.Context) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
637
	defer xerr.Contextf(&err, "zwatch")	// XXX more in context?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
638 639
	// XXX unmount on error? -> always EIO?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
640
	traceZWatch(">>>")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
641

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
642
	zwatchq := make(chan zodb.Event)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
643
	at0 := root.zstor.AddWatch(zwatchq)	// XXX -> to main thread to avoid race
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
644
	defer root.zstor.DelWatch(zwatchq)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
645
	_ = at0 // XXX XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
646

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
647
	var zevent zodb.Event
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648
	var ok bool
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
649 650

	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
651
		traceZWatch("select ...")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
652 653
		select {
		case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
654
			traceZWatch("cancel")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
655 656 657 658
			return ctx.Err()

		case zevent, ok = <-zwatchq:
			if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
659
				traceZWatch("zwatchq closed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
660 661
				return nil // closed	XXX ok?
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
662

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
663 664
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
665
		traceZWatch("zevent: %s", zevent)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
666 667 668 669 670 671 672 673 674 675 676 677 678 679

		var  *zodb.EventCommit
		switch zevent := zevent.(type) {
		default:
			return fmt.Errorf("unexpected event: %T", zevent)

		case *zodb.EventError:
			return zevent.Err

		case *zodb.EventCommit:
			 = zevent
		}

		root.zδhandle1()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
680 681 682
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683
// zδhandle1 handles 1 change event from ZODB notification.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
684
func (root *Root) zδhandle1( *zodb.EventCommit) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
685 686
	head := root.head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
687 688
	// while we are invalidating OS cache, make sure that nothing, that
	// even reads /head/bigfile/*, is running (see 4.6).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
	//
	// also make sure that cache uploaders we spawned (uploadBlk) are all
	// paused, or else they could overwrite OS cache with stale data.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for
	// details on how to do this without deadlocks.
	continueOSCacheUpload := make(chan struct{})
retry:
	for {
		head.zconnMu.Lock()
		head.pauseOSCacheUpload = true
		head.continueOSCacheUpload = continueOSCacheUpload

		if head.inflightOSCacheUploads != 0 {
			head.zconnMu.Unlock()
			continue retry
		}

		break
	}

	defer func() {
		head.pauseOSCacheUpload = false
		head.continueOSCacheUpload = nil
		head.zconnMu.Unlock()
		close(continueOSCacheUpload)
	}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
715

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
716 717
	// head.zconnMu locked and not cache uploaders are running

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
718 719
	zhead := head.zconn
	bfdir := head.bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
720

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
721 722 723
	// fileInvalidate describes invalidations for one file
	type fileInvalidate struct {
		blkmap SetI64 // changed blocks
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
724
		size   bool   // whether to invalidate file size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
725 726 727
	}
	toinvalidate := map[*BigFile]*fileInvalidate{} // {} file -> set(#blk), sizeChanged
	btreeChangev := []zodb.Oid{}                   // oids changing BTree|Bucket
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
728

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
729
	//fmt.Printf("\n\n\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
730

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
731 732
	// zδ = (tid↑, []oid)
	for _, oid := range .Changev {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
733
		// XXX zhead.Cache() lock/unlock
734
		obj := zhead.Cache().Get(oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
735
		if obj == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
736
			//fmt.Printf("%s: not in cache\n", oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
737 738 739
			continue // nothing to do - see invariant
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
740
		//fmt.Printf("%s:     in cache (%s)\n", oid, typeOf(obj))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
741

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
742 743
		switch obj := obj.(type) {
		default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
744
			continue // object not related to any bigfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
745

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
746
		case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
747
			btreeChangev = append(btreeChangev, obj.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
748

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
749
		case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
750
			btreeChangev = append(btreeChangev, obj.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
751

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
752
		case zBlk:	// ZBlk*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
753 754 755 756
			// blkBoundTo locking: no other bindZFile are running,
			// since we write-locked head.zconnMu and bindZFile is
			// run when loading objects - thus when head.zconnMu is
			// read-locked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
757 758 759
			//
			// bfdir locking: similarly not needed, since we are
			// exclusively holding head lock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
760
			for zfile, objBlk := range obj.blkBoundTo() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761
				file, ok := bfdir.fileTab[zfile.POid()]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
762 763 764 765 766 767
				if !ok {
					// even though zfile is in ZODB cache, the
					// filesystem already forgot about this file.
					continue
				}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
768
				finv, ok := toinvalidate[file]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
769
				if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
770 771
					finv = &fileInvalidate{blkmap: SetI64{}}
					toinvalidate[file] = finv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
772
				}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
773
				finv.blkmap.Update(objBlk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
774
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
775

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
776 777 778 779 780
		case *ZBigFile:
			// XXX check that .blksize and .blktab (it is only
			// persistent reference) do not change.

			// XXX shutdown fs with ^^^ message.
781
			panic("ZBigFile changed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
782
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
783 784 785

		// make sure obj won't be garbage-collected until we finish handling it.
		runtime.KeepAlive(obj)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
786 787
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
788 789
	// find out which files need to be invalidated due to index change
	// XXX no indexMu lock needed because head is Locked
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
790
	// XXX stub -> TODO full δbtree | update indexLooked itself
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
791
	//fmt.Printf("\nbtreeChangev: %v\n", btreeChangev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
792
	xfiles := bfdir.indexLooked.Invalidates(btreeChangev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
793
	//fmt.Printf("xfiles: %v\n", xfiles)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
794 795 796 797 798 799 800 801 802 803
	for xfile := range xfiles {
		file := xfile.(*BigFile)
		finv, ok := toinvalidate[file]
		if !ok {
			finv = &fileInvalidate{} // XXX init blkmap?
			toinvalidate[file] = finv
		}
		finv.size = true
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
804 805 806 807
	//fmt.Printf("\n\nzδhandle: toinvalidate (#%d):\n", len(toinvalidate))
	//for file := range toinvalidate {
	//	fmt.Printf("\t- %s\n", file.zfile.POid())
	//}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
808

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
809
	wg, ctx := errgroup.WithContext(context.TODO())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
810
	for file, finv := range toinvalidate {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
811
		file := file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
812
		for blk := range finv.blkmap {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
813
			blk := blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
814 815 816
			wg.Go(func() error {
				return file.invalidateBlk(ctx, blk)
			})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
817
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818 819 820 821 822
	}
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
823

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
824
	// invalidate kernel cache for attributes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
825
	// we need to do it only if we see topology (i.e. btree) change
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
826
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
827
	// do it after completing data invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
828 829 830 831
	wg, ctx = errgroup.WithContext(context.TODO())
	for file, finv := range toinvalidate {
		if !finv.size {
			continue
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
832
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
833
		wg.Go(func() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
834
			return file.invalidateAttr()	// XXX pass ctx?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
835
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
836
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
837
	err = wg.Wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838 839 840
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
841

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
842
	// resync .zhead to zδ.tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
843 844
	// XXX -> Head.Resync() ?

845
	// 1. abort old and resync to new txn/at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
846
	transaction.Current(zhead.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
847 848 849 850 851
	_, ctx = transaction.New(context.Background()) // XXX bg ok?
	err = zhead.Resync(ctx, .Tid)
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
852
	zhead.txnCtx = ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
853

854
	// 2. restat invalidated ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855
	// XXX -> parallel
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
856
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
857
	for file := range toinvalidate {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
858
		size, treePath, err := file.zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
859 860 861 862
		if err != nil {
			panic(err)	// XXX
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
863
		file.size = size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
864
		bfdir.indexLooked.Add(file, treePath)
865 866

		file.rev = zhead.At()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
867 868
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
869
	// notify .wcfs/zhead
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
870
	for sk := range gdebug.zheadSockTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
871
		_, err := fmt.Fprintf(sk, "%s\n", .Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
872 873 874 875 876
		if err != nil {
			log.Error(err)	// XXX errctx, -> warning?
			sk.Close()
			delete(gdebug.zheadSockTab, sk)
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
877
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
878 879
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
880
// invalidateBlk invalidates 1 file block in kernel cache.
881
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
882
// see "4.4) for all file/blk to in invalidate we do"
883
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
884
// called with f.head.zconnMu wlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
885 886 887
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
	defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)

888
	fsconn := gfsconn
889
	blksize := f.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
890 891
	off := blk*blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
892 893 894 895
	var blkdata []byte = nil

	// first try to retrieve f.loading[blk];
	// make sure f.loading[blk] is invalidated.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
896
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
897 898 899 900 901 902 903 904 905 906 907 908 909 910
	// we are running with zconnMu wlocked - no need to lock f.loadMu
	loading, ok := f.loading[blk]
	if ok {
		if loading.err == nil {
			blkdata = loading.blkdata
		}
		delete(f.loading, blk)
	}

	// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
	if blkdata == nil {
		blkdata = make([]byte, blksize)
		n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
		if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
911
			// XXX warn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
912 913 914 915
		}
		blkdata = blkdata[:n]
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
916 917 918 919
	// if less than blksize was cached - probably the kernel had to evict
	// some data from its cache already. In such case we don't try to
	// preserve the rest and drop what was read, to avoid keeping the
	// system overloaded.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
920 921
	//
	// if we have the data - preserve it under @revX/bigfile/file[blk].
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
922
	if int64(len(blkdata)) == blksize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
923 924 925
		func() {
			// store retrieved data back to OS cache for file @<rev>/file[blk]
			blkrev, _ := f.δFtail.LastRevOf(blk, f.head.zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
926
			frev, frelease, err := groot.mkrevfile(blkrev, f.zfile.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
927 928 929 930 931 932 933
			if err != nil {
				log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
			}
			defer frelease()

			st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
			if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
934
				log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
935 936
			}
		}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
937
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
938

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
939
	// invalidate file/head/data[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
940 941
	st := fsconn.FileNotify(f.Inode(), off, blksize)
	if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
942
		return syscall.Errno(st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
943
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
944

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
945
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
946
}
947

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
948
// invalidateAttr invalidates file attributes in kernel cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
949 950
//
// Complements invalidateBlk and is used to invalidate file size.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
951
func (f *BigFile) invalidateAttr() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
952
	defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
953 954
	fsconn := gfsconn
	st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
955 956 957 958
	if st != fuse.OK {
		return syscall.Errno(st)
	}
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
959 960
}

961

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
962
// mkrevfile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel.
963 964
//
// We need node ID to be know to the kernel, when we need to store data into
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
965
// file's kernel cache - if the kernel don't have the node ID for the file in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
966
// question, FileNotifyStoreCache will just fail.
967
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
968 969 970
// For kernel to know the inode mkrevfile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// to use mkrevfile from under FUSE request handler as doing so might deadlock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
971 972 973
//
// Caller must call release when inode ID is no longer required to be present.
func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release func(), err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
974 975 976 977 978 979 980 981 982
	fsconn := gfsconn

	frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
	defer xerr.Contextf(&err, "/: mkrevfile %s", frevpath)

	// first check without going through kernel, whether the inode maybe know already
	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
	if xfrev != nil {
		// FIXME checking for "node{0}" is fragile, but currently no other way
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
983 984
		// XXX the node could be still forgotten since we are not holding open on it
		// XXX -> always os.open unconditionally? or it is ok since it is just a cache?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
985
		if xfrev.String() != "node{0}" {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
986
			return xfrev.Node().(*BigFile), func(){}, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
987 988 989 990 991 992 993
		}
	}

	// we have to ping the kernel
	frevospath := gmntpt + "/" + frevpath // now starting from OS /
	f, err := os.Open(frevospath)
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
994
		return nil, nil, err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
995 996 997
	}

	xfrev = fsconn.LookupNode(root.Inode(), frevpath)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
998
	// must be !nil as open succeeded	XXX better recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
999
	return xfrev.Node().(*BigFile), func() { f.Close() }, nil
1000
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1001

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
// -------- 7) FUSE read(#blk) --------

// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
	f.head.zconnMu.RLock()
	defer f.head.zconnMu.RUnlock()

	// cap read request to file size
	end := off + int64(len(dest))		// XXX overflow?
	if end > f.size {
		end = f.size
	}
	if end <= off {
		// XXX off >= size -> EINVAL? (but when size=0 kernel issues e.g. [0 +4K) read)
		return fuse.ReadResultData(nil), fuse.OK
	}

	// widen read request to be aligned with blksize granularity
	// (we can load only whole ZBlk* blocks)
	aoff := off - (off % f.blksize)
	aend := end
	if re := end % f.blksize; re != 0 {
		aend += f.blksize - re
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1026
	// XXX use original dest if it can fit the data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
	dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
	ctx, cancel := xcontext.Merge(asctx(fctx), f.head.zconn.txnCtx)
	defer cancel()

	// read/load all block(s) in parallel
	wg, ctx := errgroup.WithContext(ctx)
	for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
		blkoff := blkoff
		blk := blkoff / f.blksize
		wg.Go(func() error {
			δ := blkoff-aoff // blk position in dest
			//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
			return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
		})
	}

	err := wg.Wait()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1047
		return nil, err2LogStatus(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057
	}

	return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}

// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1058 1059 1060
// called with head.zconnMu rlocked.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
	defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080

	// check if someone else is already loading this block
	f.loadMu.Lock()
	loading, already := f.loading[blk]
	if !already {
		loading = &blkLoadState{
			ready:   make(chan struct{}),
		}
		f.loading[blk] = loading
	}
	f.loadMu.Unlock()

	// if it is already loading - just wait for it
	if already {
		select {
		case <-ctx.Done():
			return ctx.Err()

		case <-loading.ready:
			if loading.err == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1081
				copy(dest, loading.blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1082 1083 1084 1085 1086 1087
			}
			return loading.err
		}
	}

	// noone was loading - we became responsible to load this block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1088
	blkdata, treepath, pathRevMax, err := f.zfile.LoadBlk(ctx, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1089 1090 1091 1092 1093
	loading.blkdata = blkdata
	loading.err = err

	// data loaded with error - cleanup .loading
	if loading.err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1094
		close(loading.ready)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1095 1096 1097 1098 1099 1100
		f.loadMu.Lock()
		delete(f.loading, blk)
		f.loadMu.Unlock()
		return err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1101 1102 1103 1104 1105 1106
	// we have the data - it can be used after watchers are updated
	f.updateWatchers(blk, treepath, pathRevMax)

	// data can be used now
	close(loading.ready)
	copy(dest, blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128

	// store to kernel pagecache whole block that we've just loaded from database.
	// This way, even if the user currently requested to read only small portion from it,
	// it will prevent next e.g. consecutive user read request to again hit
	// the DB, and instead will be served by kernel from its pagecache.
	//
	// We cannot do this directly from reading goroutine - while reading
	// kernel FUSE is holding corresponding page in pagecache locked, and if
	// we would try to update that same page in pagecache it would result
	// in deadlock inside kernel.
	//
	// .loading cleanup is done once we are finished with putting the data into OS pagecache.
	// If we do it earlier - a simultaneous read covered by the same block could result
	// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
	// and thus would trigger DB access again.
	//
	// XXX if direct-io: don't touch pagecache
	go f.uploadBlk(blk, loading)

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1129
// updateWatchers complements readBlk: it updates watchers of the file after a
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1130 1131
// block was loaded from ZODB and before block data is returned to kernel.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1132 1133 1134
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zconnMu rlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
func (f *BigFile) updateWatchers(blk int64, treepath []zodb.IPersistent, pathRevMax zodb.Tid) {
	// only head/ is being watched for
	if f.head.rev != 0 {
		return
	}

	// update δbtree index
	bfdir := f.head.bfdir
	bfdir.indexMu.Lock()		// XXX locking correct?
	bfdir.indexLooked.Add(f, treepath)
	bfdir.indexMu.Unlock()

	blkrevmax, _ := f.δFtail.LastRevOf(blk, f.zfile.PJar().At())	// XXX = f.head.zconn.At()
	blkrevmax = tidmin(blkrevmax, pathRevMax)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1150 1151 1152
	for w := range f.watchers {
		_ = w
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
/*
	// XXX locking
	for _, mapping := range f.mappings {
		if revmax <= mapping.at || !mapping.blkrange.in(blk) {
			continue // do nothing
		}

		if mapping.pinned.Contains(blk) {
			continue // do nothing
		}

		rev = max(δFtail.by(blk) : _ <= mapping.at)

		// XXX vvv -> go
		client.remmap(mapping.addr[blk], file/@<rev>/data)
		mapping.pinned.Add(blk)


	}
*/
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1175
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
	head := f.head

	// rlock zconnMu and make sure zwatcher is not asking us to pause.
	// if it does - wait for a safer time not to deadlock.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
	for {
		head.zconnMu.RLock()
		// help zwatcher if it asks us to pause uploadings, so it can
		// take zconnMu wlocked without deadlocks.
		if head.pauseOSCacheUpload {
			ready := head.continueOSCacheUpload
			head.zconnMu.RUnlock()
			<-ready
			continue retry
		}

		break
	}

	// zwatcher is not currently trying to pause OS cache uploads.

	// check if this block was already invalidated by zwatcher.
	// if so don't upload the block into OS cache.
	f.loadMu.Lock()
	loading_ := f.loading[blk]
	f.loadMu.Unlock()
	if loading != loading_ {
		head.zconnMu.RUnlock()
		return
	}

	oid := f.zfile.POid()

	// signal to zwatcher not to run while we are performing the upload.
	// upload with released zconnMu so that zwatcher can lock it even if to
	// check inflightOSCacheUploads status.
	atomic.AddInt32(&head.inflightOSCacheUploads, +1)
	head.zconnMu.RUnlock()

	st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)

	f.loadMu.Lock()
	bug := (loading != f.loading[blk])
	if !bug {
		delete(f.loading, blk)
	}
	f.loadMu.Unlock()

	// signal to zwatcher that we are done and it can continue.
	atomic.AddInt32(&head.inflightOSCacheUploads, -1)

	if bug {
		panic(fmt.Sprintf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk))
	}

	if st == fuse.OK {
		return
	}

	// pagecache update failed, but it must not (we verified on startup that
	// pagecache control is supported by kernel). We can correctly live on
	// with the error, but data access will be likely very slow. Tell user
	// about the problem.
	log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s  (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1244 1245 1246 1247
// -------- notifications to Watcher --------

// XXX WatchFile.Pin(blk, at)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1248

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1249 1250
// ---- Watch server ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1251 1252 1253 1254 1255 1256 1257 1258 1259
func (watch *Watch) GetAttr(out *fuse.Attr, f nodefs.File, fctx *fuse.Context) fuse.Status {
	st := watch.fsNode.GetAttr(out, f, fctx)
	// represent ourself as XXX (FileSock requirement)
	// XXX S_IFSOCK does not work (LOOKUP returns inode, open gives: "No such device or address")
	// XXX S_IFIFO  does not work (the kernel shows the file, but it being
	//     FIFO makes the data go through kernel pipe, not via FUSE filesystem)
	// XXX S_IFLNK  - the kernel wants to follow the link
	// XXX S_IFDIR  - os.open complains "is a directory" (maybe could workaround)
	// XXX S_IFCHR	- fusermount always adds nodev mount option -> the device cannot be accessed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1260
	//out.Mode = syscall.S_IFSOCK | 0644
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1261 1262 1263
	return st
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1264 1265 1266 1267 1268
// Open serves /head/watch opens.
func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
	// XXX check flags?
	w := &Watcher{
		sk:      NewFileSock(),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1269
		id:      atomic.AddInt32(&watch.idNext, +1),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1270
		head:    watch.head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1271 1272 1273 1274
		fileTab: make(map[*FileWatch]struct{}),
	}

	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1275
	// XXX del watchTab[w] on w.sk.File.Release
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1276 1277 1278 1279 1280 1281 1282
	watch.head.watchTab[w] = struct{}{}

	go w.serve()
	return w.sk.File(), fuse.OK
}

// serve serves client originated watch requests.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1283
// XXX serves rx?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1284
func (w *Watcher) serve() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1285
	err := w._serve()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1286
	_ = err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1287
	// XXX log error if !close
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1288
	// XXX close if was not closed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
	// XXX locking
	delete(w.head.watchTab, w)
}

func (w *Watcher) _serve() (err error) {
	defer xerr.Contextf(&err, "watcher %d: serve", w.id)
	r := bufio.NewReader(w.sk)

	for {
		l, err := r.ReadString('\n')	// XXX limit accepted line len not to DOS
		if err != nil {
			return err	// XXX err ctx?
		}

		fmt.Printf("watch: rx: %q\n", l)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1305
		stream, msg, err := parseWatchFrame(l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1306
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1307 1308
			// XXX write to peer too? (on which stream? -1?)
			return fmt.Errorf("rx: %s", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1309 1310
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1311
		// reply from client to to wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1312 1313 1314 1315
		reply := (stream % 2 == 0)
		if reply {
			w.rxMu.Lock()
			rxq := w.rxTab[stream]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1316
			delete(w.rxTab, stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1317 1318 1319
			w.rxMu.Unlock()

			if rxq == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1320
				return fmt.Errorf("rx %d: reply on unexpected streamd", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1321
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1322 1323 1324
			rxq <- msg
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1325

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1326
		// client-initiated request
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1327
		oid, at, err := parseWatch(msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1328
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1329 1330
			// XXX write to peer too
			return fmt.Errorf("rx %d: %s", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1331
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1332

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1333 1334
		_ = oid
		_ = at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1335

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1336 1337 1338 1339
		_, err = fmt.Fprintf(w.sk, "%d error TODO\n", stream)
		if err != nil {
			return err
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1340
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1341
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1342

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1343

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1344