Commit e19d6686 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 79113e49
...@@ -35,6 +35,7 @@ using namespace golang; ...@@ -35,6 +35,7 @@ using namespace golang;
#include <wendelin/bug.h> #include <wendelin/bug.h>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#include <vector> #include <vector>
#include <sys/types.h> #include <sys/types.h>
...@@ -50,6 +51,9 @@ using namespace golang; ...@@ -50,6 +51,9 @@ using namespace golang;
template<typename Key, typename Value> template<typename Key, typename Value>
using dict = std::unordered_map<Key, Value>; using dict = std::unordered_map<Key, Value>;
template<typename Key>
using set = std::unordered_set<Key>;
using std::vector; using std::vector;
typedef uint64_t Tid; // XXX ok? typedef uint64_t Tid; // XXX ok?
...@@ -134,10 +138,36 @@ struct _Mapping { ...@@ -134,10 +138,36 @@ struct _Mapping {
}; };
// XXX struct WatchLink // WatchLink represents /head/watch link opened on wcfs.
struct WatchLink { //
// XXX // It is created by WCFS::_openwatch().
//
// .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
class WatchLink {
WCFS *_wc;
os::File _f; // head/watch file handle
chan<structZ> _rxeof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO
_acceptq = chan() // (stream, msg) server originated messages go here
sync::Mutex _rxmu;
dict<stream, chan<XXX>> _rxtab; // {} stream -> rxq server replies go via here
set<stream> _accepted; // streams we accepted but did not replied yet
XXXint _req_next = 1; // stream ID for next client-originated request XXX -> atomic
sync::Mutex _txmu // serializes writes
bool _txclosed; // XXX = False
#if 0
serveCtx, wlink._serveCancel = context.with_cancel(context.background())
wlink._serveWG = sync.WorkGroup(serveCtx)
wlink._serveWG.go(wlink._serveRX)
#endif
public:
SrvReq *recvReq(IContext *ctx); SrvReq *recvReq(IContext *ctx);
}; };
...@@ -160,6 +190,9 @@ Conn *WCFS::connect(Tid at) { ...@@ -160,6 +190,9 @@ Conn *WCFS::connect(Tid at) {
wconn->_wc = wc; wconn->_wc = wc;
wconn->at = at; wconn->at = at;
// wconn._wlink = WatchLink(wc) XXX // wconn._wlink = WatchLink(wc) XXX
tie(wcomm._wlink, err) = wc->_openwatch();
// XXX err
// XXX reenable // XXX reenable
#if 0 #if 0
...@@ -291,6 +324,24 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) { ...@@ -291,6 +324,24 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) {
} }
// _openwatch opens new watch link on wcfs.
// XXX WatchLink* -> no ptr?
tuple<WatchLink*, error> WCFS::_openwatch() {
WCFS *wc = this;
// head/watch handle.
tie(f, err) = wc->_open("head/watch", O_RDWR);
if (err != nil)
return make_tuple(NULL, err);
wlink = new(WatchLink);
wlink->_wc = wc;
wlink->_f = f;
wlink->rx_eof = makechan<structZ>();
}
// XXX close
// ---- WCFS raw file access ---- // ---- WCFS raw file access ----
// _path returns path for object on wcfs. // _path returns path for object on wcfs.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment