Commit 9f32a779 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a0323d05
......@@ -88,181 +88,6 @@ class WCFS(_WCFS):
pass
"""
# Conn represents logical connection that provides view of data on wcfs
# filesystem as of particular database state.
#
# It uses /head/bigfile/* and notifications received from /head/watch to
# maintain isolated database view while at the same time sharing most of data
# cache in OS pagecache of /head/bigfile/*.
#
# Use WCFS.connect(at) to create Conn.
# Use .mmap to create new Mappings.
# Use .resync to resync Conn onto different database view.
#
# Conn logically mirrors ZODB.Connection .
class Conn(object):
# ._wc WCFS
# .at Tid
# ._wlink WatchLink watch/receive pins for created mappings
#
# ._filemu sync.Mutex
# ._filetab {} foid -> _File
#
# ._pinWG
# ._pinCancel
pass
# _File represent isolated file view under Conn.
class _File(object):
# .wconn Conn
# .foid hex of ZBigFile root object ID
# .blksize block size of this file
# .headf file object of head/file
# .headfsize head/file size is known to be at least headfsize (size ↑=)
# .pinned {} blk -> rev that wcfs already sent us for this file
# .mmaps []_Mapping ↑blk_start mappings of this file
pass
# _Mapping represents one mapping of _File.
class _Mapping(object):
# .file _File
# .blk_start offset of this mapping in file
# .mem mmaped memory
@property
def blk_stop(mmap):
assert len(mmap.mem) % mmap.file.blksize == 0
return mmap.blk_start + len(mmap.mem) // mmap.file.blksize
# connect creates new Conn viewing WCFS state as of @at.
@func(WCFS)
def connect(wc, at): # -> Conn
wconn = Conn()
# TODO support !isolated mode
wconn._wc = wc
wconn.at = at
wconn._wlink = WatchLink(wc)
wconn._filemu = sync.Mutex()
wconn._filetab = {}
pinCtx, wconn._pinCancel = context.with_cancel(context.background())
wconn._pinWG = sync.WorkGroup(pinCtx)
wconn._pinWG.go(wconn._pinner)
return wconn
# close releases resources associated with wconn.
# XXX what happens to file mmappings?
@func(Conn)
def close(wconn):
wconn._wlink.close()
wconn._pinCancel()
try:
wconn._pinWG.wait()
except Exception as e: # canceled - ok
if e is not context.canceled:
raise
# close all files - both that have no mappings and that still have opened mappings.
# XXX after file is closed mappings continue to survive, but we can no
# longer maintain consistent view.
with wconn._filemu:
for f in wconn._filetab.values():
f.headf.close()
f.headf = None
# XXX stop watching f
wconn._filetab = None
# _pinner receives pin messages from wcfs and adjusts wconn mappings.
#
# XXX must be running without GIL: access to wcfs mmaped memory could come from
# python code (thus with GIL taken).
@func(Conn)
def _pinner(wconn, ctx):
# if pinner fails, wcfs will kill us.
# log pinner exception so the error is not hidden.
# print to stderr as well as by default log does not print to there.
def _():
exc = sys.exc_info()[1]
if exc in (None, context.canceled): # canceled = .close asks pinner to stop
return
log.critical('pinner failed:', exc_info=1)
print('CRITICAL: pinner failed:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
print('\nCRITICAL: wcfs server will likely kill us soon.', file=sys.stderr)
defer(_)
def trace(msg):
return
print('pinner: %s' % msg)
trace('start')
while 1:
trace('recvReq ...')
req = wconn._wlink.recvReq(ctx)
trace('-> %r' % req)
if req is None:
return # XXX ok? (EOF - when wcfs closes wlink)
# we received request to pin/unpin file block. perform it
wconn._pin1(req)
# pin1 handles one pin request received from wcfs
@func(Conn)
def _pin1(wconn, req):
# reply either ack or nak on error
def _():
ack = "ack"
exc = sys.exc_info()[1]
if exc is not None:
ack = "nak: %s" % exc
#req.reply(ack)
ctx = context.background() # XXX ok?
wconn._wlink.replyReq(ctx, req, ack)
defer(_)
def trace(msg):
return
print('pin1: %s' % msg)
trace('lock _filemu ...')
with wconn._filemu:
trace('_filemu locked')
f = wconn._filetab.get(req.foid)
if f is None:
1/0 # XXX we are not watching the file - why wcfs sent us this update?
trace(f)
# XXX relock wconn -> f ?
for mmap in f.mmaps: # XXX use ↑blk_start for binary search
trace('\t%s' % mmap)
if not (mmap.blk_start <= req.blk < mmap.blk_stop):
continue # blk ∉ mmap
trace('\tremmapblk %d @%s' % (req.blk, (h(req.at) if req.at else "head")))
# FIXME check if virtmem did not dirtied page corresponding to this block already
#virt_lock()
#if not fileh_blk_isdirty(mmap.fileh, req.blk):
if 1:
mmap._remmapblk(req.blk, req.at)
#virt_unlock()
trace('\t-> remmaped')
# update f.pinned
# XXX do it before ^^^ remmapblk (so that e.g. concurrent
# discard/writeout see correct f.pinned) ?
if req.at is None:
f.pinned.pop(req.blk, None) # = delete(f.pinned, req.blk) -- unpin to @head
else:
f.pinned[req.blk] = req.at
# mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
@func(Conn)
def mmap(wconn, foid, blk_start, blk_len): # -> Mapping
......
......@@ -107,7 +107,6 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
wconn->at = at;
wconn->_wlink = wlink;
context::Context pinCtx;
tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background());
wconn->_pinWG = sync::NewWorkGroup(pinCtx);
......@@ -120,7 +119,7 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
// close releases resources associated with wconn.
// XXX what happens to file mmappings?
error _Conn::close() { // XXX error -> void?
error _Conn::close() {
_Conn& wconn = *this;
// XXX err ctx
error err;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment