Commit a4d63fbb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bc041be8
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Wendelin.bigfile | BigFile ZODB backend # Wendelin.bigfile | BigFile ZODB backend
# Copyright (C) 2014-2015 Nexedi SA and Contributors. # Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -79,6 +79,7 @@ natural to also use "2" here. ...@@ -79,6 +79,7 @@ natural to also use "2" here.
""" """
from wendelin.bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED from wendelin.bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED
from wendelin import wcfs # XXX -> wendelin.bigfile.wcfs ?
from wendelin.lib.mem import bzero, memcpy from wendelin.lib.mem import bzero, memcpy
from wendelin.lib.zodb import deactivate_btree from wendelin.lib.zodb import deactivate_btree
...@@ -489,6 +490,7 @@ class ZBigFile(LivePersistent): ...@@ -489,6 +490,7 @@ class ZBigFile(LivePersistent):
# load data ZODB obj -> page # load data ZODB obj -> page
def loadblk(self, blk, buf): def loadblk(self, blk, buf):
# XXX overlay: BUG if called (XXX here, in ZBigFile?)
zblk = self.blktab.get(blk) zblk = self.blktab.get(blk)
# no data written yet - "hole" reads as all zeros # no data written yet - "hole" reads as all zeros
if zblk is None: if zblk is None:
...@@ -538,12 +540,36 @@ class ZBigFile(LivePersistent): ...@@ -538,12 +540,36 @@ class ZBigFile(LivePersistent):
# bigfile-like # bigfile-like
def fileh_open(self): #
fileh = _ZBigFileH(self) # _use_wcfs is internal option and controls whether to use wcfs to access
# ZBigFile data:
#
# - True -> use wcfs
# - False -> don't use wcfs
# - not set -> behave according to global default
def fileh_open(self, _use_wcfs=None):
if _use_wcfs is None:
_use_wcfs = self._default_use_wcfs()
wcfileh = None
if _use_wcfs:
zstor = self._p_jar.db().storage
zurl = wcfs.zstor_2zurl(zstor)
wc = wcfs.join(zurl, shared=True)
wcfileh = wc.open(self)
fileh = _ZBigFileH(self, wcfileh)
self._v_filehset.add(fileh) self._v_filehset.add(fileh)
return fileh return fileh
# _default_use_wcfs returns whether default virtmem setting is to use wcfs or not.
@staticmethod
def _default_use_wcfs():
virtmem = os.environ.get("WENDELIN_CORE_VIRTMEM", "rw:uvmm") # unset -> !wcfs
virtmem = virtmem.lower()
return {"r:wcfs+w:uvmm": True, "rw:uvmm": True}[virtmem]
# patch for ZODB.Connection to support callback on .open() # patch for ZODB.Connection to support callback on .open()
...@@ -616,11 +642,18 @@ Connection.open = Connection_open ...@@ -616,11 +642,18 @@ Connection.open = Connection_open
@implementer(ISynchronizer) @implementer(ISynchronizer)
class _ZBigFileH(object): class _ZBigFileH(object):
# .zfile ZBigFile we were opened for # .zfile ZBigFile we were opened for
# .zfileh handle for ^^^ # .wcfileh handle for ZBigFile in wcfs | None
# .zfileh handle for ZBigFile (overlayed over .wcfileh if .wcfile != ø)
def __init__(self, zfile):
self.zfile = zfile def __init__(self, zfile, wcfileh):
self.zfileh = zfile._v_file.fileh_open() self.zfile = zfile
self.wcfileh = wcfileh
# FIXME for now we use only wcfs handle
# TODO setup overlaying
if wcfileh is not None:
self.zfileh = wcfileh
else:
self.zfileh = zfile._v_file.fileh_open()
# FIXME zfile._p_jar could be None (ex. ZBigFile is newly created # FIXME zfile._p_jar could be None (ex. ZBigFile is newly created
# before first commit) # before first commit)
......
...@@ -742,7 +742,7 @@ VMFaultResult vma_on_pagefault(VMA *vma, uintptr_t addr, int write) ...@@ -742,7 +742,7 @@ VMFaultResult vma_on_pagefault(VMA *vma, uintptr_t addr, int write)
return VM_RETRY; return VM_RETRY;
} }
inject_page: //inject_page:
/* (6) page data ready. Mmap it atomically into vma address space, or mprotect /* (6) page data ready. Mmap it atomically into vma address space, or mprotect
* appropriately if it was already mmaped. */ * appropriately if it was already mmaped. */
int prot = PROT_READ; int prot = PROT_READ;
......
# wendelin.core | tox setup # wendelin.core | tox setup
[tox] [tox]
envlist = py27-{ZODB3,ZODB4,ZODB5}-{zblk0,zblk1}-{fs,zeo,neo}-{numpy113,numpy114}, {py35,py36}-{ZODB4,ZODB5}-{zblk0,zblk1}-{fs,zeo}-{numpy113,numpy114} envlist = py27-{ZODB3,ZODB4,ZODB5}-{zblk0,zblk1}-{fs,zeo,neo}-{numpy113,numpy114}-{!wcfs,wcfs}, {py35,py36}-{ZODB4,ZODB5}-{zblk0,zblk1}-{fs,zeo}-{numpy113,numpy114}-{!wcfs,wcfs}
# (NOTE ZODB3 does not work on python3) # (NOTE ZODB3 does not work on python3)
# (NOTE NEO does not work on python3 at all) # (NOTE NEO does not work on python3 at all)
# (XXX ZODB5-*-neo are currently failing) # (XXX ZODB5-*-neo are currently failing)
...@@ -44,6 +44,9 @@ setenv = ...@@ -44,6 +44,9 @@ setenv =
zblk0: WENDELIN_CORE_ZBLK_FMT=ZBlk0 zblk0: WENDELIN_CORE_ZBLK_FMT=ZBlk0
zblk1: WENDELIN_CORE_ZBLK_FMT=ZBlk1 zblk1: WENDELIN_CORE_ZBLK_FMT=ZBlk1
!wcfs: WENDELIN_CORE_VIRTMEM=rw:uvmm
wcfs: WENDELIN_CORE_VIRTMEM=r:wcfs+w:uvmm
commands= {envpython} setup.py test commands= {envpython} setup.py test
# XXX setenv = TMPDIR = ... ? (so that /tmp is not on tmpfs and we don't run out of memory on bench) # XXX setenv = TMPDIR = ... ? (so that /tmp is not on tmpfs and we don't run out of memory on bench)
# + {envpython} setup.py bench (?) # + {envpython} setup.py bench (?)
...@@ -26,6 +26,7 @@ It will also be automatically started by default unless ...@@ -26,6 +26,7 @@ It will also be automatically started by default unless
$WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment. $WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment.
Conn represents connection to wcfs server obtained by join. Conn represents connection to wcfs server obtained by join.
FileH ... XXX
XXX XXX
""" """
...@@ -38,6 +39,8 @@ from golang import go, chan, select, default ...@@ -38,6 +39,8 @@ from golang import go, chan, select, default
from golang.gcompat import qq from golang.gcompat import qq
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.utils import u64, p64
from zodbtools.util import ashex
# Conn represents connection to wcfs server. # Conn represents connection to wcfs server.
...@@ -52,6 +55,51 @@ class Conn(object): ...@@ -52,6 +55,51 @@ class Conn(object):
def close(self): def close(self):
self._fwcfs.close() self._fwcfs.close()
# open creates wcfs file handle, which can be mmaped to give data of ZBigFile.
#
# XXX more text
#
# All mmapings of one FileH share changes.
# There can be several different FileH for the same (at, oid), and those
# FileH do not share changes.
def open(self, zfile): # -> FileH
#assert isinstance(zfile, ZBigFile) # XXX import cycle
zconn = zfile._p_jar
# XXX ._start is probably ZODB5 only -> check ZODB4 and ZODB3
zat = p64(u64(zconn._start)-1) # before -> at
# XXX pinned to @revX/... for now -> TODO /head/bigfile/...
path = '%s/@%s/bigfile/%s' % (self.mountpoint, ashex(zat), ashex(self._p_oid))
fd = os.open(path)
return FileH(fd)
# FileH is handle to opened bigfile/X.
#
# XXX it mimics BigFileH and should be integrated into virtmem (see fileh_open_overlay)
#
# XXX it should implement wcfs invalidation protocol and remmap head/... parts
# to pinned as requested.
import mmap
from bigarray.array_ram import _VMA, pagesize # XXX hack
class FileH(object):
# .fd
def __init__(self, fd):
self.fd
def __del__(self):
os.close(self.fd)
def mmap(self, pgoffset, pglen):
return _VMA(self.fd, pgoffset, pglen, pagesize, mmap.PROT_READ)
# ---- join/run wcfs ----
# serve starts and runs wcfs server for ZODB @ zurl. # serve starts and runs wcfs server for ZODB @ zurl.
# #
...@@ -103,8 +151,13 @@ def _default_autostart(): ...@@ -103,8 +151,13 @@ def _default_autostart():
# If wcfs for that zurl was already started, join connects to it. # If wcfs for that zurl was already started, join connects to it.
# Otherwise it starts wcfs for zurl if autostart is True. # Otherwise it starts wcfs for zurl if autostart is True.
# #
# If shared is True - a shared connection is returned - one that will be also
# returned for following join(shared=True) requests with the same zurl.
#
# join(zurl) -> Conn. # join(zurl) -> Conn.
def join(zurl, autostart=_default_autostart()): def join(zurl, autostart=_default_autostart(), shared=False):
# XXX implement shared
mntpt = _mntpt_4zurl(zurl) mntpt = _mntpt_4zurl(zurl)
# try opening .wcfs - if we succeed - it is already mounted. # try opening .wcfs - if we succeed - it is already mounted.
...@@ -255,7 +308,7 @@ def _wcfs_exe(): ...@@ -255,7 +308,7 @@ def _wcfs_exe():
# it also makes sure the mountpoint exists. # it also makes sure the mountpoint exists.
def _mntpt_4zurl(zurl): def _mntpt_4zurl(zurl):
# XXX what if zurl is zconfig://... ? -> then we have to look inside? # XXX what if zurl is zconfig://... ? -> then we have to look inside?
# -> _zstor_2zurl extracts zurl in canonical form and zconfig:// is not possible there. # -> zstor_2zurl extracts zurl in canonical form and zconfig:// is not possible there.
m = hashlib.sha1() m = hashlib.sha1()
m.update(zurl) m.update(zurl)
mntpt = "%s/wcfs/%s" % (tempfile.gettempdir(), m.hexdigest()) mntpt = "%s/wcfs/%s" % (tempfile.gettempdir(), m.hexdigest())
...@@ -263,8 +316,9 @@ def _mntpt_4zurl(zurl): ...@@ -263,8 +316,9 @@ def _mntpt_4zurl(zurl):
return mntpt return mntpt
# _zstor_2zurl converts a ZODB storage to URL to access it. # zstor_2zurl converts a ZODB storage to URL to access it.
def _zstor_2zurl(zstor): # XXX -> unexport?
def zstor_2zurl(zstor):
# There is, sadly, no unified way to do it, as even if storages are created via # There is, sadly, no unified way to do it, as even if storages are created via
# zodburi, after creation its uri is lost. And storages could be created not # zodburi, after creation its uri is lost. And storages could be created not
# only through URI but e.g. via ZConfig and manually. We want to support all # only through URI but e.g. via ZConfig and manually. We want to support all
......
...@@ -45,7 +45,7 @@ def setup_module(): ...@@ -45,7 +45,7 @@ def setup_module():
testdb.setup() testdb.setup()
zstor = testdb.getZODBStorage() zstor = testdb.getZODBStorage()
testzurl = wcfs._zstor_2zurl(zstor) testzurl = wcfs.zstor_2zurl(zstor)
zstor.close() zstor.close()
testmntpt = wcfs._mntpt_4zurl(testzurl) testmntpt = wcfs._mntpt_4zurl(testzurl)
os.rmdir(testmntpt) os.rmdir(testmntpt)
...@@ -86,7 +86,7 @@ def tidtime(tid): ...@@ -86,7 +86,7 @@ def tidtime(tid):
def test_zurlstable(): def test_zurlstable():
for i in range(10): for i in range(10):
zstor = testdb.getZODBStorage() zstor = testdb.getZODBStorage()
zurl = wcfs._zstor_2zurl(zstor) zurl = wcfs.zstor_2zurl(zstor)
zstor.close() zstor.close()
assert zurl == testzurl assert zurl == testzurl
...@@ -122,7 +122,6 @@ def test_join_autostart(): ...@@ -122,7 +122,6 @@ def test_join_autostart():
# XXX parametrize zblk0, zblk1 # XXX parametrize zblk0, zblk1
# XXX select !wcfs mode so that we prepare data through !wcfs path.
@func @func
def test_wcfs(): def test_wcfs():
root = testdb.dbopen() root = testdb.dbopen()
...@@ -197,9 +196,10 @@ def test_wcfs(): ...@@ -197,9 +196,10 @@ def test_wcfs():
# commit data to f and make sure we can see it on wcfs # commit data to f and make sure we can see it on wcfs
# use !wcfs mode so that we prepare data independently of wcfs code paths.
#hole = 10 XXX reenable #hole = 10 XXX reenable
hole = 1 hole = 1
fh = f.fileh_open() fh = f.fileh_open(_use_wcfs=False)
vma = fh.mmap(hole, 1) # 1 page at offset=10 vma = fh.mmap(hole, 1) # 1 page at offset=10
s = b"hello world" s = b"hello world"
memcpy(vma, s) memcpy(vma, s)
...@@ -229,7 +229,7 @@ def test_wcfs(): ...@@ -229,7 +229,7 @@ def test_wcfs():
# commit data again and make sure we can see both latest and snapshotted states. # commit data again and make sure we can see both latest and snapshotted states.
tcommit1 = Z.head tcommit1 = Z.head
fh = f.fileh_open() fh = f.fileh_open(_use_wcfs=False)
vma1 = fh.mmap(hole, 1) vma1 = fh.mmap(hole, 1)
vma2 = fh.mmap(hole+1, 1) vma2 = fh.mmap(hole+1, 1)
s1 = b"hello 123" s1 = b"hello 123"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment