Commit 24378c46 authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: client: Provide Conn.at()

To known to which DB state WCFS connection corresponds. This is similar
to zodb.Connection.At() in ZODB/go and to zconn_at in ZODB/py.

wconn.at() will be used in the next patch to verify ZSync.
parent 2be847c4
...@@ -83,12 +83,14 @@ cdef extern from "wcfs/client/wcfs.h" namespace "wcfs" nogil: ...@@ -83,12 +83,14 @@ cdef extern from "wcfs/client/wcfs.h" namespace "wcfs" nogil:
pair[Conn, error] connect(Tid at) pair[Conn, error] connect(Tid at)
cppclass _Conn: cppclass _Conn:
Tid at()
pair[FileH, error] open(Oid foid) pair[FileH, error] open(Oid foid)
error close() error close()
error resync(Tid at) error resync(Tid at)
cppclass Conn (refptr[_Conn]): cppclass Conn (refptr[_Conn]):
# Conn.X = Conn->X in C++ # Conn.X = Conn->X in C++
Tid at "_ptr()->at" ()
pair[FileH, error] open "_ptr()->open" (Oid foid) pair[FileH, error] open "_ptr()->open" (Oid foid)
error close "_ptr()->close" () error close "_ptr()->close" ()
error resync "_ptr()->resync" (Tid at) error resync "_ptr()->resync" (Tid at)
......
...@@ -64,6 +64,11 @@ cdef class PyConn: ...@@ -64,6 +64,11 @@ cdef class PyConn:
def __dealloc__(PyConn pywconn): def __dealloc__(PyConn pywconn):
pywconn.wconn = nil pywconn.wconn = nil
def at(PyConn pywconn):
with nogil:
at = wconn_at_pyexc(pywconn.wconn)
return p64(at)
def close(PyConn pywconn): def close(PyConn pywconn):
with nogil: with nogil:
err = wconn_close_pyexc(pywconn.wconn) err = wconn_close_pyexc(pywconn.wconn)
...@@ -280,6 +285,9 @@ cdef nogil: ...@@ -280,6 +285,9 @@ cdef nogil:
pair[Conn, error] wcfs_connect_pyexc(WCFS *wcfs, Tid at) except +topyexc: pair[Conn, error] wcfs_connect_pyexc(WCFS *wcfs, Tid at) except +topyexc:
return wcfs.connect(at) return wcfs.connect(at)
Tid wconn_at_pyexc(Conn wconn) except +topyexc:
return wconn.at()
error wconn_close_pyexc(Conn wconn) except +topyexc: error wconn_close_pyexc(Conn wconn) except +topyexc:
return wconn.close() return wconn.close()
......
...@@ -317,7 +317,7 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -317,7 +317,7 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
Conn wconn = adoptref(new _Conn()); Conn wconn = adoptref(new _Conn());
wconn->_wc = wc; wconn->_wc = wc;
wconn->at = at; wconn->_at = at;
wconn->_wlink = wlink; wconn->_wlink = wlink;
os::RegisterAfterFork(newref( os::RegisterAfterFork(newref(
...@@ -663,6 +663,17 @@ error _Conn::__pin1(PinReq *req) { ...@@ -663,6 +663,17 @@ error _Conn::__pin1(PinReq *req) {
return nil; return nil;
} }
// at returns database state corresponding to the connection.
zodb::Tid _Conn::at() {
_Conn& wconn = *this;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
return wconn._at;
}
// resync resyncs connection and its file mappings onto different database view. // resync resyncs connection and its file mappings onto different database view.
// //
// bigfile/_file_zodb.pyx arranges to call Conn.resync at transaction boundaries // bigfile/_file_zodb.pyx arranges to call Conn.resync at transaction boundaries
...@@ -719,7 +730,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -719,7 +730,7 @@ error _Conn::resync(zodb::Tid at) {
// set new wconn.at early, so that e.g. Conn.open running simultaneously // set new wconn.at early, so that e.g. Conn.open running simultaneously
// to second part of resync (see below) uses new at. // to second part of resync (see below) uses new at.
wconn.at = at; wconn._at = at;
// go through all files opened under wconn and pre-adjust their mappings // go through all files opened under wconn and pre-adjust their mappings
// for viewing data as of new @at state. // for viewing data as of new @at state.
...@@ -967,7 +978,7 @@ error _FileH::_open() { ...@@ -967,7 +978,7 @@ error _FileH::_open() {
// NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at // NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at
string ack; string ack;
tie(ack, err) = wconn->_wlink->sendReq(context::background(), tie(ack, err) = wconn->_wlink->sendReq(context::background(),
fmt::sprintf("watch %s @%s", v(foid), v(wconn->at))); fmt::sprintf("watch %s @%s", v(foid), v(wconn->_at)));
if (err != nil) if (err != nil)
return err; return err;
if (ack != "ok") if (ack != "ok")
...@@ -1506,7 +1517,7 @@ string _Conn::String() const { ...@@ -1506,7 +1517,7 @@ string _Conn::String() const {
// XXX don't include wcfs as prefix here? // XXX don't include wcfs as prefix here?
// (e.g. to use Conn.String in tracing without wcfs prefix) // (e.g. to use Conn.String in tracing without wcfs prefix)
// (if yes -> go and correct all xerr::Contextf calls) // (if yes -> go and correct all xerr::Contextf calls)
return fmt::sprintf("%s: conn%d @%s", v(wconn._wc), wconn._wlink->fd(), v(wconn.at)); return fmt::sprintf("%s: conn%d @%s", v(wconn._wc), wconn._wlink->fd(), v(wconn._at));
} }
string _FileH::String() const { string _FileH::String() const {
......
...@@ -178,7 +178,7 @@ struct _Conn : os::_IAfterFork, object { ...@@ -178,7 +178,7 @@ struct _Conn : os::_IAfterFork, object {
// viewing the database at particular state. .resync write-locks this and // viewing the database at particular state. .resync write-locks this and
// knows noone is using the connection for reading simultaneously. // knows noone is using the connection for reading simultaneously.
sync::RWMutex _atMu; sync::RWMutex _atMu;
zodb::Tid at; zodb::Tid _at;
sync::RWMutex _filehMu; // _atMu.W | _atMu.R + _filehMu sync::RWMutex _filehMu; // _atMu.W | _atMu.R + _filehMu
error _downErr; // !nil if connection is closed or no longer operational error _downErr; // !nil if connection is closed or no longer operational
...@@ -197,6 +197,7 @@ public: ...@@ -197,6 +197,7 @@ public:
void decref(); void decref();
public: public:
zodb::Tid at();
pair<FileH, error> open(zodb::Oid foid); pair<FileH, error> open(zodb::Oid foid);
error close(); error close();
error resync(zodb::Tid at); error resync(zodb::Tid at);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment