Commit a9a82d5a authored by Kirill Smelkov's avatar Kirill Smelkov

X bigfile/_file_zodb: Fix ZSync to close not only wconn, but also wconn.wc...

X bigfile/_file_zodb: Fix ZSync to close not only wconn, but also wconn.wc through which wconn was created

pywconnOf, before creating wconn, performs wc=wcfs.join(zurl) which
creates new filesystem-level connection to WCFS server. This wc is used
only to create wconn. So if we do not close wc, after releaseing wconn,
it will leak opened file descriptor, to e.g. .wcfs/zurl and prevent
tests from finishing cleanly.
parent 4e23152e
......@@ -123,6 +123,8 @@ cdef public class _ZBigFile(BigFile) [object _ZBigFile, type _ZBigFile_Type]:
return super(_ZBigFile, zf).fileh_open(mmap_overlay)
# XXX consider moving pywconnOf to wcfs (to e.g. wcfs.zattach) ?
# pywconnOf establishes and returns (py) wcfs.Conn associated with zconn.
#
# returned wcfs.Conn will be maintained to keep in sync with zconn, and will be
......@@ -147,6 +149,7 @@ cdef wcfs.PyConn pywconnOf(zconn):
zconn._wcfs_wconn = wconn
# keep wconn view of the database in sync with zconn
# wconn and wc (= wconn.wc) will be closed when zconn is garbage-collected
ZSync(zconn, wconn)
return wconn
......@@ -164,6 +167,7 @@ class ZSync:
# .wconn (py) wcfs.Connection
def __init__(zsync, zconn, wconn):
#print('ZSync: setup %r <-> %r' % (wconn, zconn))
assert zconn.opened
zsync.wconn = wconn
zsync.zconn_ref = weakref.ref(zconn, zsync.on_zconn_dealloc)
......@@ -187,7 +191,19 @@ class ZSync:
# .zconn dealloc -> wconn.close; release zsync
def on_zconn_dealloc(zsync, _):
zsync.wconn.close()
#print('ZSync: sched break %r <-> .' % (zsync.wconn,))
# schedule wconn.close() + wconn.wc.close()
_zsync_wclose_wg.add(1)
go(_wclose1, zsync.wconn)
# XXX how to safely schedule work to _zsync_releaser without blocking weakref callback?
"""
# (we cannot do this from under weakref callback - see _zsync_releaser for details)
_zsync_releaseq.append(...)
_zsync_releaseq.send(zsync.wconn)
"""
# unregister zsync from being kept alive
if 1: # = `with gil:` (see note in __init__)
del _zsyncReg[id(zsync)]
......@@ -196,3 +212,55 @@ class ZSync:
def on_connection_resync(zsync):
zconn = zsync.zconn_ref()
zsync.wconn.resync(zconn_at(zconn))
from golang import go
from golang import sync
# XXX disabled for now (not sure how to safely schedule work from under weakref callback)
# -> doing straight `go _wclose1` every time.
"""
from golang import go, chan
import logging as log
# _zsync_releaser is dedicated thread that closes wconn/wc after ZSync detects
# that zconn - to which wconn was associated - is no longer alive.
#
# Requests to _zsync_releaser come from ZSync.on_zconn_dealloc - which is
# called from under weakref callback when zconn is garbage collected. Since it
# is not safe to take python-level locks from under __del__ or weakref callback
# (that can cause deadlocks), the releasing work is scheduled to be done in
# separate _zsync_releaser thread.
#
# The need to take locks: even though wconn.close() can work without taking
# py-level locks, wc.close needs to take wcfs._wcmu
_zsync_releaseq = [] # of wconn
_zsync_wakeup = chan(1)
def _zsync_releaser():
while 1:
wconn, ok = _zsync_releaseq.recv_()
if not ok:
break # time to stop XXX needed?
# let's close wconn and its .wc
try:
1/0
_wclose1(wconn)
except:
log.exception("zsync: releaser: wclose %r", wconn)
# XXX nrelease += 1
go(_zsync_releaser)
"""
_zsync_wclose_wg = sync.WaitGroup()
def _wclose1(wconn):
#print('ZSync: break %r <-> .' % (wconn,))
wc = wconn.wc
wconn.close()
wc.close()
_zsync_wclose_wg.done()
# at shutdown make sure there is no in-flight _wclose1
import atexit; atexit.register(_zsync_wclose_wg.wait)
......@@ -19,7 +19,7 @@
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from wendelin.bigfile._file_zodb import ZSync
from wendelin.bigfile._file_zodb import ZSync, _zsync_wclose_wg
from wendelin import wcfs
from wendelin.lib.zodb import zstor_2zurl, zconn_at
from wendelin.lib.testing import getTestDB
......@@ -53,10 +53,11 @@ def test_zsync():
at0 = zconn_at(zconn)
# create wconn
wc = wcfs.join(zurl)
defer(wc.close)
wc_njoin0 = wc._njoin
wconn = wc.connect(at0)
assert wconn.at() == at0
# setup ZSync for wconn <-> zconn; don't keep zsync explicitly referenced
# NOTE ZSync takes ownership of wconn.wc (= wc), so we don't wc.close
ZSync(zconn, wconn)
assert wconn.at() == at0
......@@ -97,10 +98,12 @@ def test_zsync():
assert zconn_at(zconn) == at2
assert wconn.at() == at2
# close db -> zconn should disappear and ZSync should close wconn
# close db -> zconn should disappear and ZSync should close wconn and wc
del zconn
db.close()
gc.collect()
assert zconn_weak() is None
_zsync_wclose_wg.wait()
with raises(error, match=": connection closed"):
wconn.open(p64(0))
assert wc._njoin == wc_njoin0 - 1
......@@ -136,6 +136,7 @@ cdef class PyWCFS:
cdef class PyConn:
cdef Conn wconn
cdef readonly PyWCFS wc # PyWCFS that was used to create this PyConn
cdef class PyFileH:
cdef FileH wfileh
......
......@@ -56,6 +56,7 @@ cdef class PyWCFS:
cdef PyConn pywconn = PyConn.__new__(PyConn)
pywconn.wconn = wconn
pywconn.wc = pywc
return pywconn
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment