Commit 02470041 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f8704273
...@@ -43,9 +43,10 @@ from golang import sync, context ...@@ -43,9 +43,10 @@ from golang import sync, context
from golang.gcompat import qq from golang.gcompat import qq
import threading import threading
from persistent import Persistent
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.utils import u64, p64 from ZODB.utils import u64, p64
from zodbtools.util import ashex from zodbtools.util import ashex as h # XXX -> use "ashex"
# WCFS represents filesystem-level connection to wcfs server. # WCFS represents filesystem-level connection to wcfs server.
...@@ -56,6 +57,8 @@ from zodbtools.util import ashex ...@@ -56,6 +57,8 @@ from zodbtools.util import ashex
# The primary way to access wcfs is to open logical connection viewing on-wcfs # The primary way to access wcfs is to open logical connection viewing on-wcfs
# data as of particular database state, and use that logical connection to # data as of particular database state, and use that logical connection to
# create base-layer mappings. See .open and Conn for details. # create base-layer mappings. See .open and Conn for details.
#
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
class WCFS(object): class WCFS(object):
# .mountpoint path to wcfs mountpoint # .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
...@@ -84,7 +87,7 @@ class Conn(object): ...@@ -84,7 +87,7 @@ class Conn(object):
# WatchLink represents /head/watch link opened on wcfs. # WatchLink represents /head/watch link opened on wcfs.
# #
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages. # .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
class WatchLink: class WatchLink(object):
def __init__(wlink, wc): def __init__(wlink, wc):
wlink._wc = wc wlink._wc = wc
...@@ -97,10 +100,10 @@ class WatchLink: ...@@ -97,10 +100,10 @@ class WatchLink:
# #
# fdopen takes ownership of file descriptor and closes it when file # fdopen takes ownership of file descriptor and closes it when file
# object is closed -> dup fd so that each file object has its own fd. # object is closed -> dup fd so that each file object has its own fd.
wh = os.open(tdb._path("head/watch"), os.O_RDWR) wh = os.open(wc._path("head/watch"), os.O_RDWR)
wh2 = os.dup(wh) wh2 = os.dup(wh)
t._wrx = os.fdopen(wh, 'rb') wlink._wrx = os.fdopen(wh, 'rb')
t._wtx = os.fdopen(wh2, 'wb') wlink._wtx = os.fdopen(wh2, 'wb')
# XXX ... # XXX ...
...@@ -108,6 +111,45 @@ class WatchLink: ...@@ -108,6 +111,45 @@ class WatchLink:
# ---- WatchLink message IO ---- # ---- WatchLink message IO ----
# ---- WCFS raw file access ----
# _path returns path for object on wcfs.
# - str: wcfs root + obj;
# - Persistent: wcfs root + (head|@<at>)/bigfile/obj
@func(WCFS)
def _path(wc, obj, at=None):
if isinstance(obj, Persistent):
#assert type(obj) is ZBigFile XXX import cycle
objtypestr = type(obj).__module__ + "." + type(obj).__name__
assert objtypestr == "wendelin.bigfile.file_zodb.ZBigFile", objtypestr
head = "head/" if at is None else ("@%s/" % h(at))
obj = "%s/bigfile/%s" % (head, h(obj._p_oid))
at = None
assert isinstance(obj, str)
assert at is None # must not be used with str
return os.path.join(wc.mountpoint, obj)
# _read reads file corresponding to obj on wcfs.
@func(WCFS)
def _read(wc, obj, at=None):
path = wc._path(obj, at=at)
with open(path, 'rb') as f: # XXX -> readfile
return f.read()
# _stat stats file corresponding to obj on wcfs.
@func(WCFS)
def _stat(wc, obj, at=None):
path = wc._path(obj, at=at)
return os.stat(path)
# _open opens file corresponding to obj on wcfs.
@func(WCFS)
def _open(wc, obj, mode='rb', at=None):
path = wc._path(obj, at=at)
return open(path, mode, 0) # unbuffered
""" """
# open creates wcfs file handle, which can be mmaped to give data of ZBigFile. # open creates wcfs file handle, which can be mmaped to give data of ZBigFile.
# #
......
...@@ -207,8 +207,6 @@ class DFile: ...@@ -207,8 +207,6 @@ class DFile:
# #
# tDB must be explicitly closed once no longer used. # tDB must be explicitly closed once no longer used.
# #
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
#
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ? # XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tDB: class tDB:
@func @func
...@@ -429,7 +427,7 @@ class tDB: ...@@ -429,7 +427,7 @@ class tDB:
t._wc_zheadv.append(wchead) t._wc_zheadv.append(wchead)
# head/at = last txn of whole db # head/at = last txn of whole db
assert t._read("head/at") == h(t.head) assert t.wc._read("head/at") == h(t.head)
# _blkheadaccess marks head/zf[blk] accessed. # _blkheadaccess marks head/zf[blk] accessed.
def _blkheadaccess(t, zf, blk): def _blkheadaccess(t, zf, blk):
...@@ -440,33 +438,6 @@ class tDB: ...@@ -440,33 +438,6 @@ class tDB:
def _blkaccessed(t, zf): # set(blk) def _blkaccessed(t, zf): # set(blk)
return t._blkaccessedViaHead.setdefault(zf, set()) return t._blkaccessedViaHead.setdefault(zf, set())
# _path returns path for object on wcfs.
# - str: wcfs root + obj;
# - Persistent: wcfs root + (head|@<at>)/bigfile/obj
def _path(t, obj, at=None):
if isinstance(obj, Persistent):
head = "head/" if at is None else ("@%s/" % h(at))
obj = "%s/bigfile/%s" % (head, h(obj._p_oid))
at = None
assert isinstance(obj, str)
assert at is None # must not be used with str
return os.path.join(t.wc.mountpoint, obj)
# _read reads file corresponding to obj on wcfs.
def _read(t, obj, at=None):
path = t._path(obj, at=at)
return readfile(path)
# _stat stats file corresponding to obj on wcfs.
def _stat(t, obj, at=None):
path = t._path(obj, at=at)
return os.stat(path)
# _open opens file corresponding to obj on wcfs.
def _open(t, obj, mode='rb', at=None):
path = t._path(obj, at=at)
return open(path, mode, 0) # unbuffered
# tFile provides testing environment for one bigfile opened on wcfs. # tFile provides testing environment for one bigfile opened on wcfs.
# #
...@@ -483,7 +454,7 @@ class tFile: ...@@ -483,7 +454,7 @@ class tFile:
t.tdb = tdb t.tdb = tdb
t.zf = zf t.zf = zf
t.at = at t.at = at
t.f = tdb._open(zf, at=at) t.f = tdb.wc._open(zf, at=at)
t.blksize = zf.blksize t.blksize = zf.blksize
# mmap the file past the end up to _max_tracked_pages and setup # mmap the file past the end up to _max_tracked_pages and setup
...@@ -753,7 +724,7 @@ class tWatch: ...@@ -753,7 +724,7 @@ class tWatch:
class tWatchLink(wcfs.WatchLink): class tWatchLink(wcfs.WatchLink):
def __init__(t, tdb): def __init__(t, tdb):
super(tWatchLink, t).__init__(t, tdb.wc) super(tWatchLink, t).__init__(tdb.wc)
t.tdb = tdb t.tdb = tdb
...@@ -765,7 +736,7 @@ class tWatchLink(wcfs.WatchLink): ...@@ -765,7 +736,7 @@ class tWatchLink(wcfs.WatchLink):
# #
# fdopen takes ownership of file descriptor and closes it when file # fdopen takes ownership of file descriptor and closes it when file
# object is closed -> dup fd so that each file object has its own fd. # object is closed -> dup fd so that each file object has its own fd.
wh = os.open(tdb._path("head/watch"), os.O_RDWR) wh = os.open(tdb.wc._path("head/watch"), os.O_RDWR)
wh2 = os.dup(wh) wh2 = os.dup(wh)
t._wrx = os.fdopen(wh, 'rb') t._wrx = os.fdopen(wh, 'rb')
t._wtx = os.fdopen(wh2, 'wb') t._wtx = os.fdopen(wh2, 'wb')
...@@ -1308,7 +1279,7 @@ def test_wcfs_basic(): ...@@ -1308,7 +1279,7 @@ def test_wcfs_basic():
# >>> lookup non-BigFile -> must be rejected # >>> lookup non-BigFile -> must be rejected
with raises(OSError) as exc: with raises(OSError) as exc:
t._stat(t.nonzfile) t.wc._stat("head/bigfile/%s" % h(t.nonzfile._p_oid))
assert exc.value.errno == EINVAL assert exc.value.errno == EINVAL
# >>> file initially empty # >>> file initially empty
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment