file_zodb.py 30.3 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
# Wendelin.bigfile | BigFile ZODB backend
Kirill Smelkov's avatar
Kirill Smelkov committed
3
# Copyright (C) 2014-2020  Nexedi SA and Contributors.
4 5 6 7 8 9 10
#                          Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
11 12 13 14
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
15 16 17 18 19
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
20
# See https://www.nexedi.com/licensing for rationale and options.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
"""Package file_zodb provides BigFile backed by ZODB.

ZBigFile provides BigFile backend with data stored in ZODB. Files data are
stored in ZODB in blocks with each block stored as separate ZODB object(s).
This way only regular ZODB objects - not blobs - are used, and whenever file
data is changed, δ(ZODB) is proportional to δ(data).

Being BigFile ZBigFile can be memory-mapped. Created mappings provide lazy
on-read block loading and on-write dirtying. This way ZBigFile larger than RAM
can be accessed transparently as if it was a regular data in program memory.
Changes made to ZBigFile data will be either saved or discarded depending on
current transaction completion - commit or abort. The amount of ZBigFile
changes in one transaction is limited by available RAM.

ZBigFile does not weaken ZODB ACID properties, in particular:

  - (A) either none or all changes to file data will be committed;
  - (I) file view in current transaction is isolated from simultaneous
        changes to this file done by other database users.


API for clients
---------------

API for clients is ZBigFile class and its .fileh_open() method:

    .fileh_open()   -> opens new BigFileH-like object which can be mmaped

The primary user of ZBigFile is ZBigArray (see bigarray/__init__.py and
bigarray/array_zodb.py), but ZBigFile itself can be used directly too.


Data format
-----------

Due to weakness of current ZODB storage servers, wendelin.core cannot provide
at the same time both fast reads and small database size growth on small data
changes. "Small" here means something like 1-10000 bytes per transaction as
larger changes become comparable to 2M block size and are handled efficiently
out of the box. Until the problem is fixed on ZODB server side, wendelin.core
provides on-client workaround in the form of specialized block format, and
users have to explicitly indicate via environment variable that their workload
is "small changes" if they prefer to prioritize database size over access
speed::

  $WENDELIN_CORE_ZBLK_FMT
      ZBlk0             fast reads      (default)
      ZBlk1             small changes

Description of block formats follow:
71

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
To represent BigFile as ZODB objects, each file block is represented separately
either as

    1) one ZODB object, or          (ZBlk0)
    2) group of ZODB objects        (ZBlk1)

with top-level BTree directory #blk -> objects representing block.

For "1" we have

    - low-overhead access time (only 1 object loaded from DB), but
    - high-overhead in terms of ZODB size (with FileStorage / ZEO, every change
      to a block causes it to be written into DB in full again)

For "2" we have

    - low-overhead in terms of ZODB size (only part of a block is overwritten
      in DB on single change), but
    - high-overhead in terms of access time
      (several objects need to be loaded for 1 block)

In general it is not possible to have low-overhead for both i) access-time, and
ii) DB size, with approach where we do block objects representation /
management on *client* side.

On the other hand, if object management is moved to DB *server* side, it is
possible to deduplicate them there and this way have low-overhead for both
access-time and DB size with just client storing 1 object per file block. This
will be our future approach after we teach NEO about object deduplication.
101
"""
102

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
# ZBigFile organization
#
# TODO add top-level overview
#
#
# As file pages are changed in RAM with changes being managed by virtmem
# subsystem, we need to propagate the changes to ZODB objects back at some time.
#
# Two approaches exist:
#
#     1) on every RAM page dirty, in a callback invoked by virtmem, mark
#        corresponding ZODB object as dirty, and at commit time, in
#        obj.__getstate__ retrieve memory content.
#
#     2) hook into commit process, and before committing, synchronize RAM page
#        state to ZODB objects state, propagating all dirtied pages to ZODB objects
#        and then do the commit process as usual.
#
# "1" is more natural to how ZODB works, but requires tight integration between
# virtmem subsystem and ZODB (to be able to receive callback on a page dirtying).
#
# "2" is less natural to how ZODB works, but requires less-tight integration
# between virtmem subsystem and ZODB, and virtmem->ZODB propagation happens only
# at commit time.
#
# Since, for performance reasons, virtmem subsystem is going away and BigFiles
# will be represented by real FUSE-based filesystem with virtual memory being
# done by kernel, where we cannot get callback on a page-dirtying, it is more
# natural to also use "2" here.
132

133 134 135

from wendelin.bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED
from wendelin.lib.mem import bzero, memcpy
136
from wendelin.lib.zodb import LivePersistent, deactivate_btree
137 138

from transaction.interfaces import IDataManager, ISynchronizer
Kirill Smelkov's avatar
Kirill Smelkov committed
139
from persistent import Persistent, GHOST
140
from BTrees.LOBTree import LOBTree
141
from BTrees.IOBTree import IOBTree
142
from zope.interface import implementer
143
from weakref import WeakSet
144
import os
145 146 147 148 149 150

# TODO document that first data access must be either after commit or Connection.add

# FIXME peak 2·Ndirty memory consumption on write (should be 1·NDirty)


151 152
# Base class for data of 1 file block as stored in ZODB
class ZBlkBase(Persistent):
153 154
    # ._v_zfile     - ZBigFile | None
    # ._v_blk       - offset of this blk in ._v_zfile | None
155
    __slots__ = ('_v_zfile', '_v_blk')
156 157 158
    # NOTE _v_ - so that we can alter it without Persistent noticing -- we'll
    #      manage ZBlk states by ourselves explicitly.

159 160 161 162 163
    def __init__(self):
        self._v_zfile = None
        self._v_blk   = None


164
    # client requests us to load blkdata from DB, which will then go to memory
165 166 167 168 169 170
    # DB -> .blkdata  (-> memory-page)
    def loadblkdata(self):
        raise NotImplementedError()

    # client requests us to set blkdata to be later saved to DB
    # (DB <- )  .blkdata <- memory-page
171 172 173 174 175 176
    #
    # return: blkchanged=(True|False) - whether blk should be considered changed
    #         after setting its data.
    #
    #         False - when we know the data set was the same
    #         True  - data was not the same or we don't know
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    def setblkdata(self, buf):
        raise NotImplementedError()


    # make this ZBlk know it represents zfile[blk]
    # NOTE this has to be called by master every time ZBlk object potentially
    #      goes from GHOST to Live state
    # NOTE it is ok to keep reference to zfile (yes it creates
    #      ZBigFile->ZBlk->ZBigFile cycle but that does not hurt).
    def bindzfile(self, zfile, blk):
        # bind; if already bound, should be the same
        if self._v_zfile is None:
            self._v_zfile   = zfile
            self._v_blk     = blk
        else:
            assert self._v_zfile    is zfile
            assert self._v_blk      == blk


    # DB notifies this object has to be invalidated
    # (DB -> invalidate .blkdata -> invalidate memory-page)
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    #
    # FIXME this assumes that ZBlk always stays associated with #blk, not moved
    #       and never e.g. deleted from bigfile. However it is NOT correct: e.g
    #       ZBigFile.storeblk() can change type of stored zblk - i.e. it rewrites
    #       ZBigFile.blktab[blk] with another ZBlk created anew.
    #
    #       another example: a block may initially have no ZBlk attached (and
    #       thus it reads as 0). However when data in that block is changed - a
    #       new ZBlk is created, but information that block data has to be
    #       invalidated is NOT correctly received by peers.
    #
    # FIXME this assumes that ghostified ZBlk will never be removed from live
    #       objects cache (cPickleCache). However, since ZBlk is not doing
    #       anything special, and it actually becomes a ghost all the time (e.g.
    #       ZBlk0.loadblkdata() ghostifies itself not to waste memory), this
    #       assumption is NOT correct. Thus, if a ghost ZBlk will be removed from
    #       live cache, corresponding block will MISS to invalidate its data.
    #       This practically can happen if LOBucket, that is part of
    #       ZBigFile.blktab and that was holding reference to this ZBlk, gets
    #       ghostified under live cache pressure.
218 219 220 221 222 223 224 225 226 227 228 229
    def _p_invalidate(self):
        # do real invalidation only once - else we already lost ._v_zfile last time
        if self._p_state is GHOST:
            return
        # on invalidation we must be already bound
        # (to know which ZBigFileH to propagate invalidation to)
        assert self._v_zfile    is not None
        assert self._v_blk      is not None
        self._v_zfile.invalidateblk(self._v_blk)
        Persistent._p_invalidate(self)


230 231 232 233 234 235 236

# ZBlk storage formats
# NOTE once established formats do not change on disk


# ZBlk format 0: raw bytes
class ZBlk0(ZBlkBase):
237 238 239
    # ._v_blkdata   - bytes
    __slots__ = ('_v_blkdata',)

240
    # DB -> ._v_blkdata  (-> memory-page)
241 242 243 244 245 246 247
    def loadblkdata(self):
        # ensure ._v_blkdata is loaded
        # (it could be not, if e.g. loadblk is called second time for the same
        #  blk - we loaded it first time and thrown ._v_blkdata away as unneeded
        #  intermediate copy not to waste memory - see below)
        if self._v_blkdata is None:
            # ZODB says: further accesses will cause object data to be reloaded.
248 249 250
            # NOTE directly Persistent._p_invalidate() to avoid
            # virtmem->loadblk->invalidate callback
            Persistent._p_invalidate(self)
251 252 253 254 255 256 257 258 259

        blkdata = self._v_blkdata
        assert blkdata is not None
        # do not waste memory - ._v_blkdata is used only as intermediate copy on path
        # from DB to fileh mapping. See also counterpart action in __getstate__().
        self._v_blkdata = None

        return blkdata

260 261
    # (DB <- )  ._v_blkdata <- memory-page
    def setblkdata(self, buf):
262 263 264
        blkdata = bytes(buf)                    # FIXME does memcpy
        # trim trailing \0
        self._v_blkdata = blkdata.rstrip(b'\0') # FIXME copy
265

266 267 268 269
        # buf always considered to be "changed", as we do not keep old data to
        # compare.
        return True

270 271

    # DB (through pickle) requests us to emit state to save
272
    # DB <- ._v_blkdata  (<- memory-page)
273 274 275 276 277
    def __getstate__(self):
        # request to pickle should go in only when zblk was set changed (by
        # storeblk), and only once.
        assert self._v_blkdata is not None
        # NOTE self._p_changed is not necessarily true here - e.g. it is not
Kirill Smelkov's avatar
Kirill Smelkov committed
278
        # for newly created objects irregardless of initial ._p_changed=True
279 280 281

        blkdata = self._v_blkdata
        # do not waste memory for duplicated data - as soon as blkdata lands
Kirill Smelkov's avatar
Kirill Smelkov committed
282
        # into DB we can drop it here because ._v_blkdata was only an
283 284 285
        # intermediate copy from fileh memory to database.
        #
        # FIXME this works, but transaction first prepares all objects for
Kirill Smelkov's avatar
Kirill Smelkov committed
286
        #       commit and only then saves them. For use it means __getstate__
287 288 289 290 291 292 293 294 295 296
        #       gets called very late and for all objects and thus we'll be
        #       keeping ._v_blkdata for all of them before final phase =
        #       2·NDirty peak memory consumption.
        self._v_blkdata = None

        # XXX change .p_state to GHOST ?
        return blkdata


    # DB (through pickle) loads data to memory
297
    # DB -> ._v_blkdata  (-> memory-page)
298
    def __setstate__(self, state):
299
        super(ZBlk0, self).__init__()
300 301
        self._v_blkdata = state

302 303
    # ZBlk as initially created (empty placeholder)
    def __init__(self):
304
        self.__setstate__(None)
305 306


307 308 309 310 311 312 313 314 315 316 317 318
# ZBlk format 1: block splitted into chunks of fixed size in BTree
#
# NOTE zeros are not stored -> either no chunk at all, or trailing zeros
#      are stripped.

# data as Persistent object
class ZData(Persistent):
    __slots__ = ('data')
    def __init__(self, data):
        self.data = data

    def __getstate__(self):
319 320 321 322 323 324 325 326 327 328 329 330 331 332
        # request to pickle should go in only when zblk was set changed (by
        # storeblk), and only once.
        assert self._p_state is not GHOST

        data = self.data
        # do not waste memory for duplicated data - it was extracted from
        # memory page, so as soon as it lands to DB, we do not need to keep
        # data here. (see ZBlk0.__getstate__() for details)
        #
        # release .data and thus it will free after return will be processed
        # (invalidate because .data was changed - so deactivate won't work)
        self._p_invalidate()

        return data
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369

    def __setstate__(self, state):
        self.data = state


class ZBlk1(ZBlkBase):
    # .chunktab {} offset -> ZData(chunk)
    __slots__ = ('chunktab')

    # NOTE the reader does not assume chunks are of this size - it decodes
    # .chunktab as it was originally encoded - only we write new chunks with
    # this size -> so CHUNKSIZE can be changed over time.
    CHUNKSIZE = 4096    # XXX ad-hoc ?  (but is a good number = OS pagesize)


    # DB -> .chunktab  (-> memory-page)
    def loadblkdata(self):
        # empty?
        if not self.chunktab:
            return b''

        # find out whole blk len via inspecting tail chunk
        tail_start = self.chunktab.maxKey()
        tail_chunk = self.chunktab[tail_start]
        blklen = tail_start + len(tail_chunk.data)

        # whole buffer initialized as 0 + tail_chunk
        blkdata = bytearray(blklen)
        blkdata[tail_start:] = tail_chunk.data

        # go through all chunks besides tail and extract them
        stop = 0
        for start, chunk in self.chunktab.items(max=tail_start, excludemax=True):
            assert start >= stop    # verify chunks don't overlap
            stop = start+len(chunk.data)
            blkdata[start:stop] = chunk.data

370 371
        # deactivate whole .chunktab not to waste memory
        deactivate_btree(self.chunktab)
372 373 374 375 376 377 378 379

        return blkdata


    # (DB <- )  .chunktab <- memory-page
    def setblkdata(self, buf):
        chunktab  = self.chunktab
        CHUNKSIZE = self.CHUNKSIZE
380
        blkchanged= False
381 382 383 384

        # first make sure chunktab was previously written with the same CHUNKSIZE
        # (for simplicity we don't allow several chunk sizes to mix)
        for start, chunk in chunktab.items():
385
            if (start % CHUNKSIZE) or len(chunk.data) > CHUNKSIZE:
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
                chunktab.clear()
                break

        # scan over buf and update/delete changed chunks
        for start in range(0, len(buf), CHUNKSIZE):
            data = buf[start:start+CHUNKSIZE]   # FIXME copy on py2
            # make sure data is bytes
            # (else we cannot .rstrip() it below)
            if not isinstance(data, bytes):
                data = bytes(data)              # FIXME copy on py3
            # trim trailing \0
            data = data.rstrip(b'\0')           # FIXME copy
            chunk = chunktab.get(start)

            # all 0 -> make sure to remove chunk
            if not data:
                if chunk is not None:
                    del chunktab[start]
404
                    chunk._p_deactivate()
405
                    blkchanged = True
406 407 408 409 410 411 412 413

            # some !0 data -> compare and store if changed
            else:
                if chunk is None:
                    chunk = chunktab[start] = ZData(b'')

                if chunk.data != data:
                    chunk.data = data
414
                    blkchanged = True
415 416 417 418 419 420 421 422
                    # data changed and is queued to be committed to db.
                    # ZData will care about this chunk deactivation after DB
                    # asks for its data - see ZData.__getstate__().

                else:
                    # we loaded chunk for .data comparison, but now it is no
                    # more needed
                    chunk._p_deactivate()
423

424 425
        return blkchanged

426 427 428 429

    # DB (through pickle) requests us to emit state to save
    # DB <- .chunktab  (<- memory-page)
    def __getstate__(self):
430 431 432 433
        # .chunktab memory is only intermediate on path from memory-page to DB.
        # We will free it on a per-chunk basis, after each chunk is queried for
        # data by DB. See ZData.__getstate__() for details, and
        # ZBlk0.__getstate__() for more comments.
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
        return self.chunktab

    # DB (through pickle) loads data to memory
    # DB -> .chunktab  (-> memory-page)
    def __setstate__(self, state):
        super(ZBlk1, self).__init__()
        self.chunktab = state


    # ZBlk1 as initially created (empty placeholder)
    def __init__(self):
        super(ZBlk1, self).__init__()
        self.__setstate__(IOBTree())


449 450 451 452 453 454
# backward compatibility (early versions wrote ZBlk0 named as ZBlk)
ZBlk = ZBlk0

# format-name -> blk format type
ZBlk_fmt_registry = {
    'ZBlk0':    ZBlk0,
455
    'ZBlk1':    ZBlk1,
456 457 458
}

# format for updated blocks
459
ZBlk_fmt_write = os.environ.get('WENDELIN_CORE_ZBLK_FMT', 'ZBlk0')
460 461 462 463 464 465 466
if ZBlk_fmt_write not in ZBlk_fmt_registry:
    raise RuntimeError('E: Unknown ZBlk format %r' % ZBlk_fmt_write)


# ----------------------------------------


467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
# helper for ZBigFile - just redirect loadblk/storeblk back
# (because it is not possible to inherit from both Persistent and BigFile at
#  the same time - see below)
class _ZBigFile(BigFile):
    # .zself    - reference to ZBigFile

    def __new__(cls, zself, blksize):
        obj = BigFile.__new__(cls, blksize)
        obj.zself = zself
        return obj

    # redirect load/store to main class
    def loadblk(self, blk, buf):    return self.zself.loadblk(blk, buf)
    def storeblk(self, blk, buf):   return self.zself.storeblk(blk, buf)



484 485
# ZBigFile implements BigFile backend with data stored in ZODB.
#
486 487 488 489 490 491 492 493
# NOTE Can't inherit from Persistent and BigFile at the same time - both are C
# types and their layout conflict. Persistent must be here for object to be
# tracked -> BigFile is moved to a data member (the same way and for the same
# reason it is done for PersistentList & PersistentDict).
class ZBigFile(LivePersistent):
    # file is split into blocks; each block is stored as separate object in the DB
    #
    #   .blksize
494
    #   .blktab       {} blk -> ZBlk*(blkdata)
495 496

    # ._v_file      _ZBigFile helper
497
    # ._v_filehset  weakset( _ZBigFileH ) that we created
498 499 500 501 502 503 504 505 506
    #
    # NOTE Live: don't allow us to go to ghost state not to loose ._v_file
    #      which DataManager _ZBigFileH refers to.

    def __init__(self, blksize):
        LivePersistent.__init__(self)
        self.__setstate__((blksize, LOBTree()))     # NOTE L enough for blk_t


507
    # state is (.blksize, .blktab)
508 509 510 511 512 513
    def __getstate__(self):
        return (self.blksize, self.blktab)

    def __setstate__(self, state):
        self.blksize, self.blktab = state
        self._v_file = _ZBigFile(self, self.blksize)
514
        self._v_filehset = WeakSet()
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529


    # load data     ZODB obj -> page
    def loadblk(self, blk, buf):
        zblk = self.blktab.get(blk)
        # no data written yet - "hole" reads as all zeros
        if zblk is None:
            # Nothing to do here - the memory buf obtained from OS comes pre-cleared
            # XXX reenable once/if memory comes uninitialized here
            #bzero(buf)
            return

        # TODO use specialized unpickler and load data directly to blk.
        #      also in DB better store directly {#blk -> #dataref} without ZBlk overhead
        blkdata = zblk.loadblkdata()
530
        assert len(blkdata) <= self._v_file.blksize
531
        zblk.bindzfile(self, blk)
532
        memcpy(buf, blkdata)        # FIXME memcpy
533
        #bzero(buftail)             # not needed - buf comes pre-cleared from OS
534 535 536 537 538


    # store data    dirty page -> ZODB obj
    def storeblk(self, blk, buf):
        zblk = self.blktab.get(blk)
539 540 541 542 543
        zblk_type_write = ZBlk_fmt_registry[ZBlk_fmt_write]
        # if zblk was absent or of different type - we (re-)create it anew
        if zblk is None  or \
           type(zblk) is not zblk_type_write:
            zblk = self.blktab[blk] = zblk_type_write()
544

545 546
        blkchanged = zblk.setblkdata(buf)
        if blkchanged:
547 548 549 550 551 552 553 554 555 556
            # if zblk was already in DB: _p_state -> CHANGED.
            # do this unconditionally even e.g. for ZBlk1 for which only ZData inside changed:
            #
            # We cannot avoid committing ZBlk in all cases, because it is used to signal
            # other DB clients that a ZBlk needs to be invalidated and this way associated
            # fileh pages are invalidated too.
            #
            # This cannot work via ZData, because ZData don't have back-pointer to
            # ZBlk1 or to corresponding zfile.
            zblk._p_changed = True
557 558 559 560 561 562 563 564
        zblk.bindzfile(self, blk)


    # invalidate data   .blktab[blk] invalidated -> invalidate page
    def invalidateblk(self, blk):
        for fileh in self._v_filehset:
            fileh.invalidate_page(blk)  # XXX assumes blksize == pagesize

565

566 567
    # fileh_open is bigfile-like method that creates new file-handle object
    # that is given to user for mmap.
568 569 570 571
    def fileh_open(self):
        fileh = _ZBigFileH(self)
        self._v_filehset.add(fileh)
        return fileh
572 573 574



Kirill Smelkov's avatar
Kirill Smelkov committed
575 576 577

# BigFileH wrapper that also acts as DataManager proxying changes ZODB <- virtmem
# at two-phase-commit (TPC), and ZODB -> virtmem on objects invalidation.
578 579 580 581 582 583
#
# NOTE several fileh can be opened for ZBigFile - and this does not
#      conflict with the way ZODB organises its work - just for fileh
#      handles ZBigFile acts as a database, and for real ZODB database
#      it acts as (one of) connections.
#
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
# NOTE ISynchronizer is used to be able to join into transactions without
#      tracking intermediate changes to pages:
#
#      _ZBigFileH sticks to ZODB.Connection under which it was originally opened
#      and then participates in that connection lifecycle forever keeping sync on
#      that connection close and reopen.
#
#      This is required because ZBigFile and ZBigArray are both LivePersistent
#      (i.e. they never go to ghost state) and thus stays forever live (= active)
#      in Connection._cache.
#
#      If it was only ZBigFile, we could be opening new fileh every time for
#      each connection open and close/unref that fileh object on connection
#      close. But:
#
#       1. this scheme is inefficient (upon close, fileh forgets all its loaded
#          memory, and thus for newly opened fileh we'd need to reload file data
#          from scratch)
#
#       2. ZBigArray need to reference opened fileh --- since ZBigArray stays
#          live in Connection._cache, fileh also automatically stay live.
#
#      So in essence _ZBigFileH is a data manager which works in sync with ZODB
#      Connection propagating changes between fileh memory and ZODB objects.
#
# NOTE Bear in mind that after close, connection can be reopened in different
#      thread - that's why we have to adjust registration to per-thread
#      transaction_manager.
612 613 614 615
@implementer(IDataManager)
@implementer(ISynchronizer)
class _ZBigFileH(object):
    # .zfile        ZBigFile we were opened for
Kirill Smelkov's avatar
Kirill Smelkov committed
616
    # .zfileh       handle for ZBigFile in virtmem
617 618 619 620 621 622 623 624

    def __init__(self, zfile):
        self.zfile  = zfile
        self.zfileh = zfile._v_file.fileh_open()

        # FIXME zfile._p_jar could be None (ex. ZBigFile is newly created
        #       before first commit)

625
        # when connection will be reopened -> txn_manager.registerSynch(self)
626
        zfile._p_jar.onOpenCallback(self)   # -> self.on_connection_open()
627 628 629 630 631 632 633

        # when we are just initially created, the connection is already opened,
        # so manually compensate for it.
        self.on_connection_open()


    def on_connection_open(self):
634 635 636 637 638 639
        # IDataManager requires .transaction_manager
        #
        # resync txn manager every time a connection is (re)opened as even for
        # the same connection, the data manager can be different for each reopen.
        self.transaction_manager = self.zfile._p_jar.transaction_manager

640 641 642 643 644
        # when connection is closed -> txn_manager.unregisterSynch(self)
        # NOTE close callbacks are fired once, and thus we have to re-register
        #      it on every open.
        self.zfile._p_jar.onCloseCallback(self.on_connection_close)

645
        # attach us to Connection's transaction manager:
646
        #
647 648
        # Hook into txn_manager so that we get a chance to run before
        # transaction.commit().   (see .beforeCompletion() with more details)
649 650 651 652 653 654 655 656 657 658
        #
        # NOTE before ZODB < 5.5.0 Connection.transaction_manager is
        # ThreadTransactionManager which implicitly uses separate
        # TransactionManager for each thread. We are thus attaching to
        # _current_ _thread_ TM and correctness depends on the fact that
        # .transaction_manager is further used from the same thread only.
        #
        # Starting from ZODB >= 5.5.0 this issue has been fixed:
        #   https://github.com/zopefoundation/ZODB/commit/b6ac40f153
        #   https://github.com/zopefoundation/ZODB/issues/208
659 660 661
        self.transaction_manager.registerSynch(self)


662
    def on_connection_close(self):
663 664 665 666
        # detach us from connection's transaction manager.
        # (NOTE it is _current_ _thread_ TM for ZODB < 5.5.0: see notes ^^^)
        #
        # make sure we stay unlinked from txn manager until the connection is reopened.
667
        self.transaction_manager.unregisterSynch(self)
668
        self.transaction_manager = None
669 670 671

        # NOTE open callbacks are setup once and fire on every open - we don't
        #      need to resetup them here.
672 673 674 675 676 677 678


    # ~~~~ BigFileH wrapper ~~~~
    def mmap(self, pgoffset, pglen):    return self.zfileh.mmap(pgoffset, pglen)
    # .dirty_writeout?  -- not needed - these are handled via
    # .dirty_discard?   -- transaction commit/abort

679 680 681
    def invalidate_page(self, pgoffset):
        return self.zfileh.invalidate_page(pgoffset)

682 683 684 685 686 687 688

    # ~~~~ ISynchronizer ~~~~
    def beforeCompletion(self, txn):
        # if dirty, join every transaction so that it knows we want to participate in TPC.
        #
        # This is needed because we do not track every change to pages (and
        # then immediately join txn, like ZODB Connection do for objects), but
689
        # instead join txn here right before commit/abort.
690

691 692 693
        # make sure we are called only when connection is opened
        assert self.zfile._p_jar.opened

694 695 696
        if not self.zfileh.isdirty():
            return

697
        assert self not in txn._resources       # (not to join twice)
698 699 700 701 702
        txn.join(self)
        # XXX hack - join Connection manually before transaction.commit() starts.
        #
        # the reason we do it here, is that if we don't and Connection was not
        # yet joined to transaction (i.e. no changes were made to Connection's
Kirill Smelkov's avatar
Kirill Smelkov committed
703
        # objects), as storeblk() running inside commit will cause changes to
704 705 706 707
        # ZODB objects, zconn will want to join transaction, and that is
        # currently forbidden.
        #
        # More cleaner fix would be to teach transaction to allow joining
Kirill Smelkov's avatar
Kirill Smelkov committed
708
        # DataManagers while commit is running, but that comes with difficulties
709 710 711 712 713 714
        # if wanting to support whole transaction semantics (i.e. whether to
        # call joined tpc_begin(), if we are already at commit()? And also what
        # to do about order - newly joined sortKey() could be lesser than
        # DataManagers already done at current TPC phase...
        zconn = self.zfile._p_jar
        assert txn is zconn.transaction_manager.get()
715 716 717 718
        if zconn._needs_to_join:                # same as Connection._register(obj)
            assert zconn not in txn._resources  # (not to join twice)
            txn.join(zconn)                     # on first obj, without registering
            zconn._needs_to_join = False        # anything.
719 720 721 722 723 724 725 726 727 728 729 730 731 732

    def afterCompletion(self, txn):
        pass

    # NOTE called not always - only at explicit transaction.begin()
    #      -> can't use as new-txn synchronization hook
    def newTransaction(self, txn):
        pass


    # ~~~~ IDataManager ~~~~

    # key ordering us wrt other DataManager in tpc_*() sequence calling
    # NOTE for our propagated-to-objects changes to propagate to ZODB, we need
Kirill Smelkov's avatar
Kirill Smelkov committed
733
    # to act earlier than ZODB DataManager managing zfile.blktab - hence the trick.
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
    def sortKey(self):
        # XXX _p_jar can be None - if that object is new?
        zkey = self.zfile._p_jar.sortKey()
        key  = "%s%s (%s:%s)" % (
                # first letter smaller and, if possible, readable
                zkey[0] > ' ' and ' ' or '\0',
                zkey[1:], type(self), id(self))
        assert key < zkey
        return key


    # abort txn which is not in TPC phase
    def abort(self, txn):
        self.zfileh.dirty_discard()


    def tpc_begin(self, txn):
        # TODO probably take some lock and mark dirty pages RO, so that other
        # threads could not change data while the transaction is being
        # committed
        pass


    def commit(self, txn):
        # propagate changes to objects, but does not mark pages as stored -
        # - in case of following tpc_abort() we'll just drop dirty pages and
        # changes to objects we'll be done by ZODB data manager (= Connection)
        self.zfileh.dirty_writeout(WRITEOUT_STORE)


    def tpc_vote(self, txn):
        # XXX what here? verify that zfile.blktab can be committed without errors?
        pass


    def tpc_finish(self, txn):
        # finish is just "mark dirty pages as stored ok"
        self.zfileh.dirty_writeout(WRITEOUT_MARKSTORED)


    # abort txn which is in TPC phase
    def tpc_abort(self, txn):
        # this is like .abort(), but any of (including none)
        # tpc_{begin,commit,vote,finish) were called
        #
        # we assume .tpc_finish() was not yet called XXX write why
        # (because for tpc_abort to work after tpc_finish, we'll need to store
        # which file parts we wrote and invalidate that -> simpler not to care
        # and also more right - tpc_finish is there assumed as non-failing by
        # ZODB design)
        self.abort(txn)