Commit ab3841e7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f3ff04c0
...@@ -51,6 +51,12 @@ using namespace golang; ...@@ -51,6 +51,12 @@ using namespace golang;
template<typename Key, typename Value> template<typename Key, typename Value>
using dict = std::unordered_map<Key, Value>; using dict = std::unordered_map<Key, Value>;
// has returns whether container d (e.g. dict) has element k.
template<typename Container, typename Key>
bool has(const Container &d, Key k) {
return d.find(k) != d.end();
}
template<typename Key> template<typename Key>
using set = std::unordered_set<Key>; using set = std::unordered_set<Key>;
...@@ -241,8 +247,7 @@ void Conn::_pin1(SrvReq *req) { ...@@ -241,8 +247,7 @@ void Conn::_pin1(SrvReq *req) {
// XXX return error? // XXX return error?
wconn->_filemu.lock(); wconn->_filemu.lock();
auto _ = wconn->_filetab.find(req->foid); if (has(wconn->_filetab, req->foid)) {
if (_ == wconn->_filetab.end()) {
wconn->_filemu.unlock(); wconn->_filemu.unlock();
// XXX err = we are not watching the file - why wcfs sent us this update? // XXX err = we are not watching the file - why wcfs sent us this update?
} }
...@@ -337,6 +342,7 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) { ...@@ -337,6 +342,7 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) {
// ---- WatchLink ---- // ---- WatchLink ----
// Pkt internally represents data of one message sent/received over WatchLink. // Pkt internally represents data of one message sent/received over WatchLink.
// XXX used only for recv?
struct Pkt { struct Pkt {
// stream over which the data was received; used internally by send // stream over which the data was received; used internally by send
StreamID stream; StreamID stream;
...@@ -393,7 +399,8 @@ error WatchLink::_send(StreamID stream, const string &msg) { ...@@ -393,7 +399,8 @@ error WatchLink::_send(StreamID stream, const string &msg) {
pkt.rawdata = // XXX copy msg XXX + "%d <stream> ...\n" ? pkt.rawdata = // XXX copy msg XXX + "%d <stream> ...\n" ?
return wlink->_write(&pkt) return wlink->_write(&pkt)
#endif #endif
pkt = fmt::sprintf("%ul %s\n", stream, msg.c_str()); string pkt = fmt::sprintf("%ul %s\n", stream, msg.c_str());
return wlink->_write(pkt);
} }
//error WatchLink::_write(const Pkt *pkt) { //error WatchLink::_write(const Pkt *pkt) {
...@@ -417,6 +424,43 @@ error WatchLink::_write(const string &pkt) { ...@@ -417,6 +424,43 @@ error WatchLink::_write(const string &pkt) {
#endif #endif
} }
// sendReq sends client -> server request and returns server reply.
def WatchLink::sendReq(ctx, req) { // -> reply | None when EOF
WatchLink *wlink = this;
rxq = wlink._sendReq(ctx, req)
_, _rx = select(
ctx.done().recv, // 0
rxq.recv, // 1
)
if _ == 0:
raise ctx.err()
return _rx
}
def WatchLink::_sendReq(ctx, req) { // -> rxq
WatchLink *wlink = this;
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next;
wlink->_req_next = (wlink->_req_next + 2) & ((1<<64)-1);
wlink->_txmu.unlock();
rxq = chan()
wlink->_rxmu.lock();
if (has(wlink->rxtab, stream)) {
wlink->_rxmu.unlock();
panic("BUG: to-be-sent stream is present in rxtab");
}
wlink->_rxtab[stream] = rxq;
wlink->_rxmu.unlock();
err = wlink->_send(stream, req);
// XXX err
return rxq;
}
// ---- WCFS raw file access ---- // ---- WCFS raw file access ----
// _path returns path for object on wcfs. // _path returns path for object on wcfs.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment