Commit bbad4198 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2321a430
...@@ -100,8 +100,7 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve ...@@ -100,8 +100,7 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve
} }
// got something unexpected - grab the header (which we // got something unexpected - grab the header (which we
// already have read), log it and reject the // already have read), log it and reject the connection.
// connection.
b := make([]byte, 1024) b := make([]byte, 1024)
// must not block as some data is already there in cmux buffer // must not block as some data is already there in cmux buffer
n, _ := conn.Read(b) n, _ := conn.Read(b)
......
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// FIXME kill dup from pipenet!
// Package lonet provides TCP network simulated on top of localhost TCP loopback. // Package lonet provides TCP network simulated on top of localhost TCP loopback.
// //
// For testing distributed systems it is sometimes handy to imitate network of // For testing distributed systems it is sometimes handy to imitate network of
...@@ -47,9 +45,9 @@ ...@@ -47,9 +45,9 @@
// //
// Example: TODO adjust // Example: TODO adjust
// //
// net := lonet.New("") // XXX network name // net, err := lonet.Join(ctx, "mynet")
// h1 := net.Host("abc") // XXX err // h1, err := net.NewHost("abc")
// h2 := net.Host("def") // ... // h2, err := net.NewHost("def") // ...
// //
// // XXX inject 127.0.0.1 to example... // // XXX inject 127.0.0.1 to example...
// // starts listening on address "abc:10" (which gets mapped to "127.0.0.1:xxx") // // starts listening on address "abc:10" (which gets mapped to "127.0.0.1:xxx")
...@@ -66,9 +64,24 @@ ...@@ -66,9 +64,24 @@
// See also shipped lonet.py for accessing lonet networks from Python. // See also shipped lonet.py for accessing lonet networks from Python.
package lonet package lonet
// XXX document lonet organization, protocol
// > lonet "network" dial <src> <dst>
// < lonet "network" connected <dst'>
// E connrefused
//
// E wrong network|op|...
//
// - protocol error
// - wrong network
// - wrong op
import ( import (
"context" "context"
"fmt"
"io"
"io/ioutil" "io/ioutil"
"log"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
...@@ -90,14 +103,12 @@ type Addr struct { ...@@ -90,14 +103,12 @@ type Addr struct {
Port int // port on host Port int // port on host
} }
// SubNetwork represents one segment of a lonet network. // SubNetwork represents one subnetwork of a lonet network.
// //
// Multiple Hosts could be created on one segment. // Multiple Hosts could be created on one subnetwork.
// There can be other network segments in the same process or in another OS-level processes. // There can be other subnetworks in the same process or in another OS-level processes.
// //
// Host names are unique through whole lonet network. // Host names are unique through whole lonet network.
//
// XXX text
type SubNetwork struct { type SubNetwork struct {
// name of full network under "lonet" namespace -> e.g. "" // name of full network under "lonet" namespace -> e.g. ""
// full network name will be reported as "lonet"+network. // full network name will be reported as "lonet"+network.
...@@ -110,12 +121,11 @@ type SubNetwork struct { ...@@ -110,12 +121,11 @@ type SubNetwork struct {
// whenever connection to subnet's host is tried to be established it goes here. // whenever connection to subnet's host is tried to be established it goes here.
oslistener net.Listener oslistener net.Listener
// big network lock for everything dynamic under SubNetwork // big subnetwork lock for everything dynamic under SubNetwork
// (e.g. Host.socketv too) XXX // (e.g. Host.socketv too)
mu sync.Mutex mu sync.Mutex
// hostMap map[string]*Host // XXX lonet: not needed (registry instead) hostMap map[string]*Host
// XXX track all hosts so SubNetwork.Shutdown shuts them down?
} }
// Host represents named access point on Network XXX // Host represents named access point on Network XXX
...@@ -123,7 +133,7 @@ type Host struct { ...@@ -123,7 +133,7 @@ type Host struct {
subnet *SubNetwork subnet *SubNetwork
name string name string
// NOTE protected by subnet.mu XXX // NOTE protected by subnet.mu
socketv []*socket // port -> listener | conn ; [0] is always nil socketv []*socket // port -> listener | conn ; [0] is always nil
} }
...@@ -155,14 +165,22 @@ type listener struct { ...@@ -155,14 +165,22 @@ type listener struct {
// subnetwork/host/port we are listening on // subnetwork/host/port we are listening on
socket *socket socket *socket
// dialq chan dialReq // Dial requests to our port go here dialq chan dialReq // Dial requests to our port go here from OS-level listener
down chan struct{} // Close -> down=ready down chan struct{} // Close -> down=ready
closeOnce sync.Once closeOnce sync.Once
} }
// XXX dialReq // dialReq represents one dial request to listener.
//
// it comes after OS-level connection was accepted and lonet dial already
// request parsed locally.
type dialReq struct {
from Addr
osconn net.Conn
resp chan Addr // accepted with this local address
}
// ---------------------------------------- // ----------------------------------------
...@@ -210,12 +228,15 @@ func Join(ctx context.Context, network string) (_ *SubNetwork, err error) { ...@@ -210,12 +228,15 @@ func Join(ctx context.Context, network string) (_ *SubNetwork, err error) {
} }
// joined ok // joined ok
return &SubNetwork{ n := &SubNetwork{
network: network, network: network,
registry: registry, registry: registry,
oslistener: oslistener, oslistener: oslistener,
// hostMap: make(map[string]*Host), hostMap: make(map[string]*Host),
}, nil }
go n.serve()
return n, nil
} }
// NewHost creates new lonet Host with given name. // NewHost creates new lonet Host with given name.
...@@ -233,7 +254,19 @@ func (n *SubNetwork) NewHost(ctx context.Context, name string) (*Host, error) { ...@@ -233,7 +254,19 @@ func (n *SubNetwork) NewHost(ctx context.Context, name string) (*Host, error) {
} }
// announced ok -> host can be created // announced ok -> host can be created
return &Host{subnet: n, name: name}, nil n.mu.Lock()
defer n.mu.Unlock()
if n.hostMap[name] != nil {
panic(fmt.Sprintf(
"lonet %q: new host %q: announced to registry but .hostMap already !empty",
n.Network(), name))
}
host := &Host{subnet: n, name: name}
n.hostMap[name] = host
return host, nil
} }
// XXX Host.resolveAddr // XXX Host.resolveAddr
...@@ -243,6 +276,121 @@ func (h *Host) Listen(laddr string) (net.Listener, error) { ...@@ -243,6 +276,121 @@ func (h *Host) Listen(laddr string) (net.Listener, error) {
panic("TODO") panic("TODO")
} }
// XXX
func (n *SubNetwork) serve() { // XXX error?
// wait for incoming OS connections and do lonet protocol handshake on them.
// if successful - route handshaked connection to particular Host's listener.
for {
osconn, err := n.oslistener.Accept()
if err != nil {
// XXX mark subnet as down + notify all its hosts
return
}
go func() {
err := n.loaccept(osconn) // XXX + ctx?
if err != nil {
log.Print(err) // XXX ok?
}
}()
}
}
// loaccept handles incoming OS-level connection.
//
// it performs lonet protocol handshake and if successfull further conveys
// accepted connection to lonet-level Accept.
func (n *SubNetwork) loaccept(osconn net.Conn) (err error) {
defer xerr.Contextf(&err, "lonet %q: handshake", n.network)
// read handshake line and parse it
// XXX cancel handshake on ctx down
line, err := readline(osconn, maxlen)
if err != nil {
return err
}
var network, src, dst string
_, err = fmt.Sscanf(line, "lonet %q dial %s %s\n", &network, &src, &dst)
if err != nil {
ereply("protocol error")
return err
}
if network != n.network {
ereply("network mismatch")
return fmt.Errorf("network mismatch")
}
asrc, err := ParseAddr(src)
if err != nil {
return ereply("src address invalid")
}
adst, err := ParseAddr(dst)
if err != nil {
return ereply("dst address invalid")
}
// check dst host:port in .hostMap
n.mu.Lock()
host := n.hostMap[adst.Host]
if host == nil || adst.Port >= len(host.socketv) {
n.mu.Unlock()
return ereplyf(errConnRefused)
}
sk := host.socketv[adst.Port]
if sk == nil || sk.listener == nil {
n.mu.Unlock()
return ereplyf(errConnRefused)
}
// there is listener corresponding to dst - let's connect it
l := sk.listener
n.mu.Unlock()
resp := make(chan XXX)
select {
//case <-ctx.Done():
// ...
case <-l.down:
return ereplyf(errConnRefused)
case l.dialq <- dialReq{from: asrc, osconn: osconn, resp: resp}:
// connection accepted
acceptAddr := <-resp
replyf("connected %s", acceptAddr)
}
}
// readline reads 1 line from r up to maxlen bytes.
func readline(r io.Reader, maxlen int) (string, _ error) {
buf1 := []byte{0}
var line []byte
for len(line) < maxlen {
n, err := r.Read(buf1)
if n == 1 {
err = nil
}
if err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return nil, err
}
line = append(line, buf1...)
if buf1[0] == '\n' {
break
}
}
return string(line), nil
}
// XXX // XXX
func (h *Host) Dial(ctx context.Context, addr string) (net.Conn, error) { func (h *Host) Dial(ctx context.Context, addr string) (net.Conn, error) {
panic("TODO") panic("TODO")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment