Commit 09bdef24 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f4a422b7
......@@ -45,9 +45,11 @@ import threading
from persistent import Persistent
from ZODB.FileStorage import FileStorage
from ZODB.utils import u64, p64
from ZODB.utils import z64, u64, p64
from zodbtools.util import ashex as h # XXX -> use "ashex"
from six import reraise
# WCFS represents filesystem-level connection to wcfs server.
#
......@@ -125,8 +127,44 @@ class WatchLink(object):
# this tWatchLink currently watches the following files at particular state.
wlink._watching = {} # {} foid -> tWatch
# ---- WatchLink message IO ----
def _closeTX(wlink):
if wlink._txclosed:
return
# ---- WatchLink message IO ----
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
# _serveRX on client (= on us). The connection can be already closed by
# wcfs - so ignore errors when sending bye.
try:
wlink._send(1, b'bye')
except IOError:
pass
wlink._wtx.close()
wlink._txclosed = True
def close(wlink):
wlink._closeTX()
wlink._serveCancel()
# XXX we can get stuck here if wcfs does not behave as we want.
# XXX in particular if there is a silly - e.g. syntax or type error in
# test code - we currently get stuck here.
#
# XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
try:
wlink._serveWG.wait()
except Exception as e:
# canceled is expected and ok
if e != context.canceled:
reraise(e, None, e.__traceback__)
wlink._wrx.close()
# disable all established watches
for w in wlink._watching.values():
w.at = z64
w.pinned = {}
wlink._watching = {}
# ---- WCFS raw file access ----
......
......@@ -728,44 +728,9 @@ class tWatchLink(wcfs.WatchLink):
t.tdb = tdb
tdb._wlinks.add(t)
def _closeTX(t):
if t._txclosed:
return
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
# _serveRX on client (= on us). The connection can be already closed by
# wcfs - so ignore errors when sending bye.
try:
t._send(1, b'bye')
except IOError:
pass
t._wtx.close()
t._txclosed = True
def close(t):
t.tdb._wlinks.remove(t)
t._closeTX()
t._serveCancel()
# XXX we can get stuck here if wcfs does not behave as we want.
# XXX in particular if there is a silly - e.g. syntax or type error in
# test code - we currently get stuck here.
#
# XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
try:
t._serveWG.wait()
except Exception as e:
# canceled is expected and ok
if e != context.canceled:
reraise(e, None, e.__traceback__)
t._wrx.close()
# disable all established watches
for w in t._watching.values():
w.at = z64
w.pinned = {}
t._watching = {}
super(tWatchLink, t).close()
# ---- message IO ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment