file_zodb.py 35.7 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
# Wendelin.bigfile | BigFile ZODB backend
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
3
# Copyright (C) 2014-2019  Nexedi SA and Contributors.
4 5 6 7 8 9 10
#                          Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
11 12 13 14
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
15 16 17 18 19
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
20
# See https://www.nexedi.com/licensing for rationale and options.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
21 22 23 24 25 26 27 28
"""Package file_zodb provides BigFile backed by ZODB.

ZBigFile provides BigFile backend with data stored in ZODB. Files data are
stored in ZODB in blocks with each block stored as separate ZODB object(s).
This way only regular ZODB objects - not blobs - are used, and whenever file
data is changed, δ(ZODB) is proportional to δ(data).

Being BigFile ZBigFile can be memory-mapped. Created mappings provide lazy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
29
on-read block loading and on-write dirtying. This way ZBigFile larger than RAM
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30 31 32 33
can be accessed transparently as if it was a regular data in program memory.
Changes made to ZBigFile data will be either saved or discarded depending on
current transaction completion - commit or abort. The amount of ZBigFile
changes in one transaction is limited by available RAM.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34 35 36 37 38 39 40

ZBigFile does not weaken ZODB ACID properties, in particular:

  - (A) either none or all changes to file data will be committed;
  - (I) file view in current transaction is isolated from simultaneous
        changes to this file done by other database users.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
41

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
42 43 44
API for clients
---------------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45 46 47 48 49 50
API for clients is ZBigFile class and its .fileh_open() method:

    .fileh_open()   -> opens new BigFileH-like object which can be mmaped

The primary user of ZBigFile is ZBigArray (see bigarray/__init__.py and
bigarray/array_zodb.py), but ZBigFile itself can be used directly too.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51 52 53 54 55 56 57


Operating mode
--------------

Two operating modes are provided: "local-cache" and "shared-cache".

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58
Local-cache is the mode wendelin.core was originally implemented with in 2015.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
59
In this mode ZBigFile data is loaded from ZODB directly via current ZODB connection.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
60
It was relatively straight-forward to implement, but cached file data become
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
61
duplicated in between ZODB connections of current process and in between
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62
several client processes that use ZODB.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
63 64 65

In shared-cache mode file's data is accessed through special filesystem for
which data cache is centrally maintained by OS kernel. This mode was added in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68
2020 and reduces wendelin.core RAM consumption dramatically. Note that even
though the cache is shared, isolation property is still fully provided. Please
see wcfs/wcfs.go which describes the filesystem and shared-cache mode in detail.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
69 70 71 72

The mode of operation can be selected via environment variable::

  $WENDELIN_CORE_VIRTMEM
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
73 74
      rw:uvmm           local-cache     (i.e. !wcfs)    (default)
      r:wcfs+w:uvmm     shared-cache    (i.e.  wcfs)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75 76 77 78 79


Data format
-----------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80 81
Due to weakness of current ZODB storage servers, wendelin.core cannot provide
at the same time both fast reads and small database size growth on small data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82 83 84 85 86 87 88
changes. "Small" here means something like 1-10000 bytes per transaction as
larger changes become comparable to 2M block size and are handled efficiently
out of the box. Until the problem is fixed on ZODB server side, wendelin.core
provides on-client workaround in the form of specialized block format, and
users have to explicitly indicate via environment variable that their workload
is "small changes" if they prefer to prioritize database size over access
speed::
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
89 90 91 92

  $WENDELIN_CORE_ZBLK_FMT
      ZBlk0             fast reads      (default)
      ZBlk1             small changes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
94 95
Description of block formats follow:

96 97 98 99
To represent BigFile as ZODB objects, each file block is represented separately
either as

    1) one ZODB object, or          (ZBlk0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
100
    2) group of ZODB objects        (ZBlk1)     XXX wcfs loads in parallel
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124

with top-level BTree directory #blk -> objects representing block.

For "1" we have

    - low-overhead access time (only 1 object loaded from DB), but
    - high-overhead in terms of ZODB size (with FileStorage / ZEO, every change
      to a block causes it to be written into DB in full again)

For "2" we have

    - low-overhead in terms of ZODB size (only part of a block is overwritten
      in DB on single change), but
    - high-overhead in terms of access time
      (several objects need to be loaded for 1 block)

In general it is not possible to have low-overhead for both i) access-time, and
ii) DB size, with approach where we do block objects representation /
management on *client* side.

On the other hand, if object management is moved to DB *server* side, it is
possible to deduplicate them there and this way have low-overhead for both
access-time and DB size with just client storing 1 object per file block. This
will be our future approach after we teach NEO about object deduplication.
125 126
"""

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
127
# ZBigFile organization
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
128
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129 130
# TODO add top-level overview
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
#   zfile   (ZBigFile)
#       .blksize
#       .blktab     LOBTree #blk -> ZBlk*
#
#       ._v_file        _ZBigFile
#       ._v_filehset    weakset(_ZBigFileH) created for zfile
#
#   zfileh  (_ZBigFileH)
#
# ZBigFile is kept as Live persistent because XXX
#
#
#   DB -> ZBlk.blkdata  (-> memory-page)
#   (DB <- )  ZBlk.blkdata <- memory-page
#
#   (DB -> invalidate ZBlk.blkdata -> invalidate memory-page)
#   + FIXME topology changes are not handled correctly
#   + FIXME ZBlk is ghostified
#
#
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
# As file pages are changed in RAM with changes being managed by virtmem
# subsystem, we need to propagate the changes to ZODB objects back at some time.
#
# Two approaches exist:
#
#     1) on every RAM page dirty, in a callback invoked by virtmem, mark
#        corresponding ZODB object as dirty, and at commit time, in
#        obj.__getstate__ retrieve memory content.
#
#     2) hook into commit process, and before committing, synchronize RAM page
#        state to ZODB objects state, propagating all dirtied pages to ZODB objects
#        and then do the commit process as usual.
#
# "1" is more natural to how ZODB works, but requires tight integration between
# virtmem subsystem and ZODB (to be able to receive callback on a page dirtying).
#
# "2" is less natural to how ZODB works, but requires less-tight integration
# between virtmem subsystem and ZODB, and virtmem->ZODB propagation happens only
# at commit time.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
172
# Since, for performance reasons, virtmem subsystem is going away and BigFiles      XXX kill
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
173 174 175 176
# will be represented by real FUSE-based filesystem with virtual memory being
# done by kernel, where we cannot get callback on a page-dirtying, it is more
# natural to also use "2" here.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
177

178
from wendelin.bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179
from wendelin import wcfs
180
from wendelin.lib.mem import bzero, memcpy
181
from wendelin.lib.zodb import deactivate_btree
182 183

from transaction.interfaces import IDataManager, ISynchronizer
184
from persistent import Persistent, PickleCache, GHOST
185
from BTrees.LOBTree import LOBTree
186
from BTrees.IOBTree import IOBTree
187
from zope.interface import implementer
188
from ZODB.Connection import Connection
189
from weakref import WeakSet
190
import os
191 192 193 194 195 196

# TODO document that first data access must be either after commit or Connection.add

# FIXME peak 2·Ndirty memory consumption on write (should be 1·NDirty)


197 198
# Base class for data of 1 file block as stored in ZODB
class ZBlkBase(Persistent):
199 200
    # ._v_zfile     - ZBigFile | None
    # ._v_blk       - offset of this blk in ._v_zfile | None
201
    __slots__ = ('_v_zfile', '_v_blk')
202 203 204
    # NOTE _v_ - so that we can alter it without Persistent noticing -- we'll
    #      manage ZBlk states by ourselves explicitly.

205 206 207 208 209
    def __init__(self):
        self._v_zfile = None
        self._v_blk   = None


210
    # client requests us to load blkdata from DB, which will then go to memory
211 212 213 214 215 216
    # DB -> .blkdata  (-> memory-page)
    def loadblkdata(self):
        raise NotImplementedError()

    # client requests us to set blkdata to be later saved to DB
    # (DB <- )  .blkdata <- memory-page
217 218 219 220 221 222
    #
    # return: blkchanged=(True|False) - whether blk should be considered changed
    #         after setting its data.
    #
    #         False - when we know the data set was the same
    #         True  - data was not the same or we don't know
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    def setblkdata(self, buf):
        raise NotImplementedError()


    # make this ZBlk know it represents zfile[blk]
    # NOTE this has to be called by master every time ZBlk object potentially
    #      goes from GHOST to Live state
    # NOTE it is ok to keep reference to zfile (yes it creates
    #      ZBigFile->ZBlk->ZBigFile cycle but that does not hurt).
    def bindzfile(self, zfile, blk):
        # bind; if already bound, should be the same
        if self._v_zfile is None:
            self._v_zfile   = zfile
            self._v_blk     = blk
        else:
            assert self._v_zfile    is zfile
            assert self._v_blk      == blk


    # DB notifies this object has to be invalidated
    # (DB -> invalidate .blkdata -> invalidate memory-page)
244 245 246 247 248
    #
    # FIXME this assumes that ZBlk always stays associated with #blk, not moved
    #       and never e.g. deleted from bigfile. However it is NOT correct: e.g
    #       ZBigFile.storeblk() can change type of stored zblk - i.e. it rewrites
    #       ZBigFile.blktab[blk] with another ZBlk created anew.
249
    #
250
    #       another example: a block may initially have no ZBlk attached (and
251
    #       thus it reads as 0). However when data in that block is changed - a
252 253 254
    #       new ZBlk is created, but information that block data has to be
    #       invalidated is NOT correctly received by peers.
    #
255
    # FIXME this assumes that ghostified ZBlk will never be removed from live
256
    #       objects cache (cPickleCache). However, since ZBlk is not doing
257 258 259 260 261 262 263
    #       anything special, and it actually becomes a ghost all the time (e.g.
    #       ZBlk0.loadblkdata() ghostifies itself not to waste memory), this
    #       assumption is NOT correct. Thus, if a ghost ZBlk will be removed from
    #       live cache, corresponding block will MISS to invalidate its data.
    #       This practically can happen if LOBucket, that is part of
    #       ZBigFile.blktab and that was holding reference to this ZBlk, gets
    #       ghostified under live cache pressure.
264 265 266 267 268 269 270 271 272 273 274 275
    def _p_invalidate(self):
        # do real invalidation only once - else we already lost ._v_zfile last time
        if self._p_state is GHOST:
            return
        # on invalidation we must be already bound
        # (to know which ZBigFileH to propagate invalidation to)
        assert self._v_zfile    is not None
        assert self._v_blk      is not None
        self._v_zfile.invalidateblk(self._v_blk)
        Persistent._p_invalidate(self)


276 277 278 279 280 281 282

# ZBlk storage formats
# NOTE once established formats do not change on disk


# ZBlk format 0: raw bytes
class ZBlk0(ZBlkBase):
283 284 285
    # ._v_blkdata   - bytes
    __slots__ = ('_v_blkdata',)

286
    # DB -> ._v_blkdata  (-> memory-page)
287 288 289 290 291 292 293
    def loadblkdata(self):
        # ensure ._v_blkdata is loaded
        # (it could be not, if e.g. loadblk is called second time for the same
        #  blk - we loaded it first time and thrown ._v_blkdata away as unneeded
        #  intermediate copy not to waste memory - see below)
        if self._v_blkdata is None:
            # ZODB says: further accesses will cause object data to be reloaded.
294 295 296
            # NOTE directly Persistent._p_invalidate() to avoid
            # virtmem->loadblk->invalidate callback
            Persistent._p_invalidate(self)
297 298 299 300 301 302 303 304 305

        blkdata = self._v_blkdata
        assert blkdata is not None
        # do not waste memory - ._v_blkdata is used only as intermediate copy on path
        # from DB to fileh mapping. See also counterpart action in __getstate__().
        self._v_blkdata = None

        return blkdata

306 307
    # (DB <- )  ._v_blkdata <- memory-page
    def setblkdata(self, buf):
308 309 310
        blkdata = bytes(buf)                    # FIXME does memcpy
        # trim trailing \0
        self._v_blkdata = blkdata.rstrip(b'\0') # FIXME copy
311

312 313 314 315
        # buf always considered to be "changed", as we do not keep old data to
        # compare.
        return True

316 317

    # DB (through pickle) requests us to emit state to save
318
    # DB <- ._v_blkdata  (<- memory-page)
319 320 321 322 323
    def __getstate__(self):
        # request to pickle should go in only when zblk was set changed (by
        # storeblk), and only once.
        assert self._v_blkdata is not None
        # NOTE self._p_changed is not necessarily true here - e.g. it is not
Kirill Smelkov's avatar
Kirill Smelkov committed
324
        # for newly created objects irregardless of initial ._p_changed=True
325 326 327

        blkdata = self._v_blkdata
        # do not waste memory for duplicated data - as soon as blkdata lands
Kirill Smelkov's avatar
Kirill Smelkov committed
328
        # into DB we can drop it here because ._v_blkdata was only an
329 330 331
        # intermediate copy from fileh memory to database.
        #
        # FIXME this works, but transaction first prepares all objects for
Kirill Smelkov's avatar
Kirill Smelkov committed
332
        #       commit and only then saves them. For use it means __getstate__
333 334 335 336 337 338 339 340 341 342
        #       gets called very late and for all objects and thus we'll be
        #       keeping ._v_blkdata for all of them before final phase =
        #       2·NDirty peak memory consumption.
        self._v_blkdata = None

        # XXX change .p_state to GHOST ?
        return blkdata


    # DB (through pickle) loads data to memory
343
    # DB -> ._v_blkdata  (-> memory-page)
344
    def __setstate__(self, state):
345
        super(ZBlk0, self).__init__()
346 347
        self._v_blkdata = state

348 349
    # ZBlk as initially created (empty placeholder)
    def __init__(self):
350
        self.__setstate__(None)
351 352


353 354 355 356 357 358 359 360 361 362 363 364
# ZBlk format 1: block splitted into chunks of fixed size in BTree
#
# NOTE zeros are not stored -> either no chunk at all, or trailing zeros
#      are stripped.

# data as Persistent object
class ZData(Persistent):
    __slots__ = ('data')
    def __init__(self, data):
        self.data = data

    def __getstate__(self):
365 366 367 368 369 370 371 372 373 374 375 376 377 378
        # request to pickle should go in only when zblk was set changed (by
        # storeblk), and only once.
        assert self._p_state is not GHOST

        data = self.data
        # do not waste memory for duplicated data - it was extracted from
        # memory page, so as soon as it lands to DB, we do not need to keep
        # data here. (see ZBlk0.__getstate__() for details)
        #
        # release .data and thus it will free after return will be processed
        # (invalidate because .data was changed - so deactivate won't work)
        self._p_invalidate()

        return data
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415

    def __setstate__(self, state):
        self.data = state


class ZBlk1(ZBlkBase):
    # .chunktab {} offset -> ZData(chunk)
    __slots__ = ('chunktab')

    # NOTE the reader does not assume chunks are of this size - it decodes
    # .chunktab as it was originally encoded - only we write new chunks with
    # this size -> so CHUNKSIZE can be changed over time.
    CHUNKSIZE = 4096    # XXX ad-hoc ?  (but is a good number = OS pagesize)


    # DB -> .chunktab  (-> memory-page)
    def loadblkdata(self):
        # empty?
        if not self.chunktab:
            return b''

        # find out whole blk len via inspecting tail chunk
        tail_start = self.chunktab.maxKey()
        tail_chunk = self.chunktab[tail_start]
        blklen = tail_start + len(tail_chunk.data)

        # whole buffer initialized as 0 + tail_chunk
        blkdata = bytearray(blklen)
        blkdata[tail_start:] = tail_chunk.data

        # go through all chunks besides tail and extract them
        stop = 0
        for start, chunk in self.chunktab.items(max=tail_start, excludemax=True):
            assert start >= stop    # verify chunks don't overlap
            stop = start+len(chunk.data)
            blkdata[start:stop] = chunk.data

416 417
        # deactivate whole .chunktab not to waste memory
        deactivate_btree(self.chunktab)
418 419 420 421 422 423 424 425

        return blkdata


    # (DB <- )  .chunktab <- memory-page
    def setblkdata(self, buf):
        chunktab  = self.chunktab
        CHUNKSIZE = self.CHUNKSIZE
426
        blkchanged= False
427 428 429 430

        # first make sure chunktab was previously written with the same CHUNKSIZE
        # (for simplicity we don't allow several chunk sizes to mix)
        for start, chunk in chunktab.items():
431
            if (start % CHUNKSIZE) or len(chunk.data) > CHUNKSIZE:
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
                chunktab.clear()
                break

        # scan over buf and update/delete changed chunks
        for start in range(0, len(buf), CHUNKSIZE):
            data = buf[start:start+CHUNKSIZE]   # FIXME copy on py2
            # make sure data is bytes
            # (else we cannot .rstrip() it below)
            if not isinstance(data, bytes):
                data = bytes(data)              # FIXME copy on py3
            # trim trailing \0
            data = data.rstrip(b'\0')           # FIXME copy
            chunk = chunktab.get(start)

            # all 0 -> make sure to remove chunk
            if not data:
                if chunk is not None:
                    del chunktab[start]
450
                    chunk._p_deactivate()
451
                    blkchanged = True
452 453 454 455 456 457 458 459

            # some !0 data -> compare and store if changed
            else:
                if chunk is None:
                    chunk = chunktab[start] = ZData(b'')

                if chunk.data != data:
                    chunk.data = data
460
                    blkchanged = True
461 462 463 464 465 466 467 468
                    # data changed and is queued to be committed to db.
                    # ZData will care about this chunk deactivation after DB
                    # asks for its data - see ZData.__getstate__().

                else:
                    # we loaded chunk for .data comparison, but now it is no
                    # more needed
                    chunk._p_deactivate()
469

470 471
        return blkchanged

472 473 474 475

    # DB (through pickle) requests us to emit state to save
    # DB <- .chunktab  (<- memory-page)
    def __getstate__(self):
476 477 478 479
        # .chunktab memory is only intermediate on path from memory-page to DB.
        # We will free it on a per-chunk basis, after each chunk is queried for
        # data by DB. See ZData.__getstate__() for details, and
        # ZBlk0.__getstate__() for more comments.
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
        return self.chunktab

    # DB (through pickle) loads data to memory
    # DB -> .chunktab  (-> memory-page)
    def __setstate__(self, state):
        super(ZBlk1, self).__init__()
        self.chunktab = state


    # ZBlk1 as initially created (empty placeholder)
    def __init__(self):
        super(ZBlk1, self).__init__()
        self.__setstate__(IOBTree())


495 496 497 498 499 500
# backward compatibility (early versions wrote ZBlk0 named as ZBlk)
ZBlk = ZBlk0

# format-name -> blk format type
ZBlk_fmt_registry = {
    'ZBlk0':    ZBlk0,
501
    'ZBlk1':    ZBlk1,
502 503 504
}

# format for updated blocks
505
ZBlk_fmt_write = os.environ.get('WENDELIN_CORE_ZBLK_FMT', 'ZBlk0')
506 507 508 509 510 511 512
if ZBlk_fmt_write not in ZBlk_fmt_registry:
    raise RuntimeError('E: Unknown ZBlk format %r' % ZBlk_fmt_write)


# ----------------------------------------


513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
# helper for ZBigFile - just redirect loadblk/storeblk back
# (because it is not possible to inherit from both Persistent and BigFile at
#  the same time - see below)
class _ZBigFile(BigFile):
    # .zself    - reference to ZBigFile

    def __new__(cls, zself, blksize):
        obj = BigFile.__new__(cls, blksize)
        obj.zself = zself
        return obj

    # redirect load/store to main class
    def loadblk(self, blk, buf):    return self.zself.loadblk(blk, buf)
    def storeblk(self, blk, buf):   return self.zself.storeblk(blk, buf)



# Persistent that never goes to ghost state, if it was ever uptodate.
531 532 533 534 535 536 537 538 539 540
#
# NOTE
#
# On invalidation LivePersistent still goes to ghost state, because
# invalidation cannot be ignored, i.e. they indicate the object has been
# changed externally.
#
# Invalidation can happen only at transaction boundary, so during the course of
# transaction LivePersistent is guaranteed to stay uptodate.
#
541 542 543 544 545 546 547 548 549 550 551 552
# XXX move to common place?
class LivePersistent(Persistent):
    # don't allow us to go to ghost
    #
    # NOTE we can't use STICKY as that state is assumed as
    # short-lived-temporary by ZODB and is changed back to UPTODATE by
    # persistent code. In fact ZODB says: STICKY is UPTODATE+keep in memory.
    def _p_deactivate(self):
        # just returning here won't allow Persistent._p_deactivate() run and
        # thus we'll stay in non-ghost state.
        return

553 554
    # NOTE _p_invalidate() is triggered on invalidations. We do not override it.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
555

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
556 557
# ZBigFile implements BigFile backend with data stored in ZODB.
#
558 559 560 561 562 563 564 565
# NOTE Can't inherit from Persistent and BigFile at the same time - both are C
# types and their layout conflict. Persistent must be here for object to be
# tracked -> BigFile is moved to a data member (the same way and for the same
# reason it is done for PersistentList & PersistentDict).
class ZBigFile(LivePersistent):
    # file is split into blocks; each block is stored as separate object in the DB
    #
    #   .blksize
566
    #   .blktab       {} blk -> ZBlk*(blkdata)
567 568

    # ._v_file      _ZBigFile helper
569
    # ._v_filehset  weakset( _ZBigFileH ) that we created
570 571 572 573 574 575 576 577
    #
    # NOTE Live: don't allow us to go to ghost state not to loose ._v_file
    #      which DataManager _ZBigFileH refers to.

    def __init__(self, blksize):
        LivePersistent.__init__(self)
        self.__setstate__((blksize, LOBTree()))     # NOTE L enough for blk_t

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
578
        # TODO use custom class for .blktab with adjusted bucket size, something like
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
579 580 581 582 583
        # class xLOBTree(LOBTree):
        #   __slots__ = ()
        #   max_leaf_size       = ...   # BTree's default =  60
        #   max_internal_size   = ...   # BTree's default = 500

584

585
    # state is (.blksize, .blktab)
586 587 588 589 590 591
    def __getstate__(self):
        return (self.blksize, self.blktab)

    def __setstate__(self, state):
        self.blksize, self.blktab = state
        self._v_file = _ZBigFile(self, self.blksize)
592
        self._v_filehset = WeakSet()
593 594 595 596


    # load data     ZODB obj -> page
    def loadblk(self, blk, buf):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
597
        # XXX overlay: BUG if called    (XXX here, in ZBigFile?)
598 599 600 601 602 603 604 605 606 607 608
        zblk = self.blktab.get(blk)
        # no data written yet - "hole" reads as all zeros
        if zblk is None:
            # Nothing to do here - the memory buf obtained from OS comes pre-cleared
            # XXX reenable once/if memory comes uninitialized here
            #bzero(buf)
            return

        # TODO use specialized unpickler and load data directly to blk.
        #      also in DB better store directly {#blk -> #dataref} without ZBlk overhead
        blkdata = zblk.loadblkdata()
609
        assert len(blkdata) <= self._v_file.blksize
610
        zblk.bindzfile(self, blk)
611
        memcpy(buf, blkdata)        # FIXME memcpy
612
        #bzero(buftail)             # not needed - buf comes pre-cleared from OS
613 614 615 616 617


    # store data    dirty page -> ZODB obj
    def storeblk(self, blk, buf):
        zblk = self.blktab.get(blk)
618 619 620 621 622
        zblk_type_write = ZBlk_fmt_registry[ZBlk_fmt_write]
        # if zblk was absent or of different type - we (re-)create it anew
        if zblk is None  or \
           type(zblk) is not zblk_type_write:
            zblk = self.blktab[blk] = zblk_type_write()
623

624 625
        blkchanged = zblk.setblkdata(buf)
        if blkchanged:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
626 627 628 629 630 631 632 633 634 635
            # if zblk was already in DB: _p_state -> CHANGED.
            # do this unconditionally even e.g. for ZBlk1 for which only ZData inside changed:
            #
            # We cannot avoid committing ZBlk in all cases, because it is used to signal
            # other DB clients that a ZBlk needs to be invalidated and this way associated
            # fileh pages are invalidated too.
            #
            # This cannot work via ZData, because ZData don't have back-pointer to
            # ZBlk1 or to corresponding zfile.
            zblk._p_changed = True
636 637 638 639 640 641 642 643
        zblk.bindzfile(self, blk)


    # invalidate data   .blktab[blk] invalidated -> invalidate page
    def invalidateblk(self, blk):
        for fileh in self._v_filehset:
            fileh.invalidate_page(blk)  # XXX assumes blksize == pagesize

644 645


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
646 647
    # fileh_open is bigfile-like method that creates new file-handle object
    # that is given to user for mmap.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648 649 650 651 652 653 654
    #
    # _use_wcfs is internal option and controls whether to use wcfs to access
    # ZBigFile data:
    #
    # - True    -> use wcfs
    # - False   -> don't use wcfs
    # - not set -> behave according to global default
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
655 656 657
    #
    # XXX several fileh could be opened for one ZBigFile. Is it useful at all?
    #     normally in one zconn there is only one zfileh opened for zfile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
658 659 660 661 662 663
    def fileh_open(self, _use_wcfs=None):
        if _use_wcfs is None:
            _use_wcfs = self._default_use_wcfs()

        wcfileh = None
        if _use_wcfs:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
664
            # TODO maintain zconn -> wconn in sync (p_jar -> wconn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
665 666
            zconn = self._p_jar
            zstor = zconn.db().storage
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667
            zurl = wcfs.zstor_2zurl(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
668
            wc = wcfs.join(zurl)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
669
            wconn = wc.connect(zconn.at())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
670
            wcfileh = wconn.open(self._p_oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
671 672

        fileh = _ZBigFileH(self, wcfileh)
673 674
        self._v_filehset.add(fileh)
        return fileh
675 676


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
677 678 679 680 681
    # _default_use_wcfs returns whether default virtmem setting is to use wcfs or not.
    @staticmethod
    def _default_use_wcfs():
        virtmem = os.environ.get("WENDELIN_CORE_VIRTMEM", "rw:uvmm")    # unset -> !wcfs
        virtmem = virtmem.lower()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
682
        return {"r:wcfs+w:uvmm": True, "rw:uvmm": False}[virtmem]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683

684 685


686 687 688 689 690 691
# patch for ZODB.Connection to support callback on .open()
# NOTE on-open  callbacks are setup once and fire many times on every open
#      on-close callbacks are setup once and fire only once on next close
Connection._onOpenCallbacks = None
def Connection_onOpenCallback(self, f):
    if self._onOpenCallbacks is None:
Kirill Smelkov's avatar
Kirill Smelkov committed
692
        # NOTE WeakSet does not work for bound methods - they are always created
693
        # anew for each obj.method access, and thus will go away almost immediately
694
        self._onOpenCallbacks = WeakSet()
695 696 697 698 699 700 701 702 703
    self._onOpenCallbacks.add(f)

assert not hasattr(Connection, 'onOpenCallback')
Connection.onOpenCallback = Connection_onOpenCallback

orig_Connection_open = Connection.open
def Connection_open(self, transaction_manager=None, delegate=True):
    orig_Connection_open(self, transaction_manager, delegate)

704 705 706
    # FIXME method name hardcoded. Better not do it and allow f to be general
    # callable, but that does not work with bound method - see above.
    # ( Something like WeakMethod from py3 could help )
707 708
    if self._onOpenCallbacks:
        for f in self._onOpenCallbacks:
709
            f.on_connection_open()
710 711 712 713 714 715

Connection.open = Connection_open
# ------------



716 717
# BigFileH wrapper that also acts as DataManager proxying changes back to ZODB
# objects at two-phase-commit (TPC) level.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
718 719
# XXX and ZODB -> virtmem invalidation.
# XXX split into _ZBigFileH + _ZSync ?
720 721 722 723 724 725
#
# NOTE several fileh can be opened for ZBigFile - and this does not
#      conflict with the way ZODB organises its work - just for fileh
#      handles ZBigFile acts as a database, and for real ZODB database
#      it acts as (one of) connections.
#
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
# NOTE ISynchronizer is used to be able to join into transactions without
#      tracking intermediate changes to pages:
#
#      _ZBigFileH sticks to ZODB.Connection under which it was originally opened
#      and then participates in that connection lifecycle forever keeping sync on
#      that connection close and reopen.
#
#      This is required because ZBigFile and ZBigArray are both LivePersistent
#      (i.e. they never go to ghost state) and thus stays forever live (= active)
#      in Connection._cache.
#
#      If it was only ZBigFile, we could be opening new fileh every time for
#      each connection open and close/unref that fileh object on connection
#      close. But:
#
#       1. this scheme is inefficient (upon close, fileh forgets all its loaded
#          memory, and thus for newly opened fileh we'd need to reload file data
#          from scratch)
#
#       2. ZBigArray need to reference opened fileh --- since ZBigArray stays
#          live in Connection._cache, fileh also automatically stay live.
#
#      So in essence _ZBigFileH is a data manager which works in sync with ZODB
#      Connection propagating changes between fileh memory and ZODB objects.
#
# NOTE Bear in mind that after close, connection can be reopened in different
#      thread - that's why we have to adjust registration to per-thread
#      transaction_manager.
754 755 756 757
@implementer(IDataManager)
@implementer(ISynchronizer)
class _ZBigFileH(object):
    # .zfile        ZBigFile we were opened for
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
758
    # .wcfileh      handle for ZBigFile@zconn.at view in wcfs | None
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
759
    # .zfileh       handle for ZBigFile in virtmem (overlayed over .wcfileh if .wcfileh != ø)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
760

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761
    def __init__(self, zfile, wcfileh):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
762
        self.zfile   = zfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
763
        self.wcfileh = wcfileh
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
764
        self.zfileh = zfile._v_file.fileh_open()    # XXX pass wcfileh in
765 766 767 768

        # FIXME zfile._p_jar could be None (ex. ZBigFile is newly created
        #       before first commit)

769
        # when connection will be reopened -> txn_manager.registerSynch(self)
770
        zfile._p_jar.onOpenCallback(self)   # -> self.on_connection_open()
771 772 773 774 775 776 777

        # when we are just initially created, the connection is already opened,
        # so manually compensate for it.
        self.on_connection_open()


    def on_connection_open(self):
778 779 780 781 782 783
        # IDataManager requires .transaction_manager
        #
        # resync txn manager every time a connection is (re)opened as even for
        # the same connection, the data manager can be different for each reopen.
        self.transaction_manager = self.zfile._p_jar.transaction_manager

784 785 786 787 788
        # when connection is closed -> txn_manager.unregisterSynch(self)
        # NOTE close callbacks are fired once, and thus we have to re-register
        #      it on every open.
        self.zfile._p_jar.onCloseCallback(self.on_connection_close)

789
        # attach us to Connection's transaction manager:
790
        #
791 792
        # Hook into txn_manager so that we get a chance to run before
        # transaction.commit().   (see .beforeCompletion() with more details)
793 794 795 796 797 798 799 800 801 802
        #
        # NOTE before ZODB < 5.5.0 Connection.transaction_manager is
        # ThreadTransactionManager which implicitly uses separate
        # TransactionManager for each thread. We are thus attaching to
        # _current_ _thread_ TM and correctness depends on the fact that
        # .transaction_manager is further used from the same thread only.
        #
        # Starting from ZODB >= 5.5.0 this issue has been fixed:
        #   https://github.com/zopefoundation/ZODB/commit/b6ac40f153
        #   https://github.com/zopefoundation/ZODB/issues/208
803 804 805
        self.transaction_manager.registerSynch(self)


806
    def on_connection_close(self):
807 808 809 810
        # detach us from connection's transaction manager.
        # (NOTE it is _current_ _thread_ TM for ZODB < 5.5.0: see notes ^^^)
        #
        # make sure we stay unlinked from txn manager until the connection is reopened.
811
        self.transaction_manager.unregisterSynch(self)
812
        self.transaction_manager = None
813 814 815

        # NOTE open callbacks are setup once and fire on every open - we don't
        #      need to resetup them here.
816 817 818 819 820 821 822


    # ~~~~ BigFileH wrapper ~~~~
    def mmap(self, pgoffset, pglen):    return self.zfileh.mmap(pgoffset, pglen)
    # .dirty_writeout?  -- not needed - these are handled via
    # .dirty_discard?   -- transaction commit/abort

823 824 825
    def invalidate_page(self, pgoffset):
        return self.zfileh.invalidate_page(pgoffset)

826 827 828 829 830 831 832

    # ~~~~ ISynchronizer ~~~~
    def beforeCompletion(self, txn):
        # if dirty, join every transaction so that it knows we want to participate in TPC.
        #
        # This is needed because we do not track every change to pages (and
        # then immediately join txn, like ZODB Connection do for objects), but
833
        # instead join txn here right before commit/abort.
834

835 836 837
        # make sure we are called only when connection is opened
        assert self.zfile._p_jar.opened

838 839 840
        if not self.zfileh.isdirty():
            return

841
        assert self not in txn._resources       # (not to join twice)
842 843 844 845 846 847 848 849 850 851
        txn.join(self)
        # XXX hack - join Connection manually before transaction.commit() starts.
        #
        # the reason we do it here, is that if we don't and Connection was not
        # yet joined to transaction (i.e. no changes were made to Connection's
        # objects), as storeblk() running inside commit will case changes to
        # ZODB objects, zconn will want to join transaction, and that is
        # currently forbidden.
        #
        # More cleaner fix would be to teach transaction to allow joining
Kirill Smelkov's avatar
Kirill Smelkov committed
852
        # DataManagers while commit is running, but that comes with difficulties
853 854 855 856 857 858
        # if wanting to support whole transaction semantics (i.e. whether to
        # call joined tpc_begin(), if we are already at commit()? And also what
        # to do about order - newly joined sortKey() could be lesser than
        # DataManagers already done at current TPC phase...
        zconn = self.zfile._p_jar
        assert txn is zconn.transaction_manager.get()
859 860 861 862
        if zconn._needs_to_join:                # same as Connection._register(obj)
            assert zconn not in txn._resources  # (not to join twice)
            txn.join(zconn)                     # on first obj, without registering
            zconn._needs_to_join = False        # anything.
863 864 865 866 867 868 869 870 871 872 873 874 875 876

    def afterCompletion(self, txn):
        pass

    # NOTE called not always - only at explicit transaction.begin()
    #      -> can't use as new-txn synchronization hook
    def newTransaction(self, txn):
        pass


    # ~~~~ IDataManager ~~~~

    # key ordering us wrt other DataManager in tpc_*() sequence calling
    # NOTE for our propagated-to-objects changes to propagate to ZODB, we need
Kirill Smelkov's avatar
Kirill Smelkov committed
877
    # to act earlier than ZODB DataManager managing zfile.blktab - hence the trick.
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
    def sortKey(self):
        # XXX _p_jar can be None - if that object is new?
        zkey = self.zfile._p_jar.sortKey()
        key  = "%s%s (%s:%s)" % (
                # first letter smaller and, if possible, readable
                zkey[0] > ' ' and ' ' or '\0',
                zkey[1:], type(self), id(self))
        assert key < zkey
        return key


    # abort txn which is not in TPC phase
    def abort(self, txn):
        self.zfileh.dirty_discard()


    def tpc_begin(self, txn):
        # TODO probably take some lock and mark dirty pages RO, so that other
        # threads could not change data while the transaction is being
        # committed
        pass


    def commit(self, txn):
        # propagate changes to objects, but does not mark pages as stored -
        # - in case of following tpc_abort() we'll just drop dirty pages and
        # changes to objects we'll be done by ZODB data manager (= Connection)
        self.zfileh.dirty_writeout(WRITEOUT_STORE)


    def tpc_vote(self, txn):
        # XXX what here? verify that zfile.blktab can be committed without errors?
        pass


    def tpc_finish(self, txn):
        # finish is just "mark dirty pages as stored ok"
        self.zfileh.dirty_writeout(WRITEOUT_MARKSTORED)


    # abort txn which is in TPC phase
    def tpc_abort(self, txn):
        # this is like .abort(), but any of (including none)
        # tpc_{begin,commit,vote,finish) were called
        #
        # we assume .tpc_finish() was not yet called XXX write why
        # (because for tpc_abort to work after tpc_finish, we'll need to store
        # which file parts we wrote and invalidate that -> simpler not to care
        # and also more right - tpc_finish is there assumed as non-failing by
        # ZODB design)
        self.abort(txn)