Commit de4c4f48 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f951b01c
...@@ -43,7 +43,7 @@ import ( ...@@ -43,7 +43,7 @@ import (
// Client talks to NEO cluster and exposes access to it via ZODB interfaces. // Client talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct { type Client struct {
node neo.NodeApp node *neo.NodeApp
talkMasterCancel func() talkMasterCancel func()
...@@ -77,17 +77,7 @@ func (c *Client) StorageName() string { ...@@ -77,17 +77,7 @@ func (c *Client) StorageName() string {
// It will connect to master @masterAddr and identify with sepcified cluster name. // It will connect to master @masterAddr and identify with sepcified cluster name.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
cli := &Client{ cli := &Client{
node: neo.NodeApp{ node: neo.NewNodeApp(net, neo.CLIENT, clusterName, masterAddr, ""),
MyInfo: neo.NodeInfo{Type: neo.CLIENT, Addr: neo.Address{}},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
NodeTab: &neo.NodeTable{},
PartTab: &neo.PartitionTable{},
ClusterState: -1, // invalid
},
mlinkReady: make(chan struct{}), mlinkReady: make(chan struct{}),
} }
......
...@@ -918,6 +918,7 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLin ...@@ -918,6 +918,7 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLin
} }
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink. // ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
//
// The listener accepts only those connections that pass handshake. // The listener accepts only those connections that pass handshake.
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) { func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr) rawl, err := net.Listen(laddr)
......
...@@ -35,7 +35,7 @@ import ( ...@@ -35,7 +35,7 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/xio" //"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
...@@ -68,32 +68,62 @@ type NodeApp struct { ...@@ -68,32 +68,62 @@ type NodeApp struct {
ClusterState ClusterState // master idea about cluster state ClusterState ClusterState // master idea about cluster state
} }
// Dial connects to another node in the cluster // NewNodeApp creates new node application
func NewNodeApp(net xnet.Networker, typ NodeType, clusterName, masterAddr, serveAddr string) *NodeApp {
// convert serveAddr into neo format
addr, err := AddrString(net.Network(), serveAddr)
if err != nil {
panic(err) // XXX
}
app := &NodeApp{
MyInfo: NodeInfo{Type: typ, Addr: addr},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
NodeTab: &NodeTable{},
PartTab: &PartitionTable{},
ClusterState: -1, // invalid
}
app.NodeTab.nodeApp = app
return app
}
// Dial connects to another node in the cluster.
// //
// It handshakes, requests identification and checks peer type. If successful returned are: // It handshakes, requests identification and checks peer type. If successful returned are:
// - established link // - established link
// - accept identification reply // - accept identification reply
func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *NodeLink, _ *AcceptIdentification, err error) { //
link, err := DialLink(ctx, n.Net, addr) // Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial.
func (app *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *NodeLink, _ *AcceptIdentification, err error) {
link, err := DialLink(ctx, app.Net, addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
defer xerr.Contextf(&err, "%s: request identification", link) defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error or ctx cancel // close link on error or FIXME: ctx cancel
cleanup := xio.CloseWhenDone(ctx, link) //cleanup := xio.CloseWhenDone(ctx, link)
defer func() { defer func() {
if err != nil { if err != nil {
cleanup() // FIXME wrong - err=nil -> goroutine still left hanging waiting
// for ctx and will close link if dial ctx closes
// cleanup()
lclose(ctx, link)
} }
}() }()
req := &RequestIdentification{ req := &RequestIdentification{
NodeType: n.MyInfo.Type, NodeType: app.MyInfo.Type,
UUID: n.MyInfo.UUID, UUID: app.MyInfo.UUID,
Address: n.MyInfo.Addr, Address: app.MyInfo.Addr,
ClusterName: n.ClusterName, ClusterName: app.ClusterName,
IdTimestamp: n.MyInfo.IdTimestamp, // XXX ok? IdTimestamp: app.MyInfo.IdTimestamp, // XXX ok?
} }
accept := &AcceptIdentification{} accept := &AcceptIdentification{}
// FIXME error if peer sends us something with another connID // FIXME error if peer sends us something with another connID
...@@ -120,7 +150,7 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ * ...@@ -120,7 +150,7 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *
// XXX accept.MyUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M) // XXX accept.MyUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M)
// XXX accept.YourUUID // XXX M can tell us to change UUID -> take in effect // XXX accept.YourUUID // XXX M can tell us to change UUID -> take in effect
// XXX accept.NumPartitions, ... wrt n.node.PartTab // XXX accept.NumPartitions, ... wrt app.node.PartTab
return link, accept, nil return link, accept, nil
} }
...@@ -130,9 +160,9 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ * ...@@ -130,9 +160,9 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *
// //
// If the address is empty one new free is automatically selected. // If the address is empty one new free is automatically selected.
// The node information about where it listens at is appropriately updated. // The node information about where it listens at is appropriately updated.
func (n *NodeApp) Listen() (Listener, error) { func (app *NodeApp) Listen() (Listener, error) {
// start listening // start listening
ll, err := ListenLink(n.Net, n.MyInfo.Addr.String()) ll, err := ListenLink(app.Net, app.MyInfo.Addr.String())
if err != nil { if err != nil {
return nil, err // XXX err ctx return nil, err // XXX err ctx
} }
...@@ -148,7 +178,7 @@ func (n *NodeApp) Listen() (Listener, error) { ...@@ -148,7 +178,7 @@ func (n *NodeApp) Listen() (Listener, error) {
return nil, err // XXX err ctx return nil, err // XXX err ctx
} }
n.MyInfo.Addr = addr app.MyInfo.Addr = addr
l := &listener{ l := &listener{
l: ll, l: ll,
......
...@@ -58,6 +58,9 @@ import ( ...@@ -58,6 +58,9 @@ import (
// //
// NodeTable zero value is valid empty node table. // NodeTable zero value is valid empty node table.
type NodeTable struct { type NodeTable struct {
// XXX for Node.Dial to work. see also comments vvv near "peer link"
nodeApp *NodeApp
// users have to care locking explicitly // users have to care locking explicitly
//sync.RWMutex XXX needed ? //sync.RWMutex XXX needed ?
...@@ -264,6 +267,16 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func() ...@@ -264,6 +267,16 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()
// ---- peer link ---- // ---- peer link ----
// TODO review peer link dialing / setting / accepting.
//
// Keep in mind that in NEO in general case it is not client/server but peer-to-perr
// e.g. when two S establish a link in between then to exchange/sync data.
//
// Also the distinction beetween S and M should go away as every S should
// be taught to also become M (and thus separate M nodes go away
// completely) with constant reelection being happenningin the background
// like in raft.
// SetLink sets link to peer node. // SetLink sets link to peer node.
// XXX // XXX
// //
...@@ -307,6 +320,38 @@ func (p *Node) CloseLink(ctx context.Context) { ...@@ -307,6 +320,38 @@ func (p *Node) CloseLink(ctx context.Context) {
} }
// dial does low-level work to dial peer
// XXX p.* reading without lock - ok?
// XXX app.MyInfo without lock - ok?
func (p *Node) dial(ctx context.Context) (*NodeLink, error) {
app := p.nodeTab.nodeApp
link, accept, err := app.Dial(ctx, p.Type, p.Addr.String())
if err != nil {
return nil, err
}
// verify peer identifies as what we expect
switch {
// type is already checked by app.Dial
case accept.MyUUID != p.UUID:
err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID)
case accept.YourUUID != app.MyInfo.UUID:
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, app.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
}
if err != nil {
//log.Errorif(ctx, link.Close())
lclose(ctx, link)
link = nil
}
return link, err
}
// even if dialing a peer failed, we'll attempt redial after this timeout // even if dialing a peer failed, we'll attempt redial after this timeout
const δtRedial = 3 * time.Second const δtRedial = 3 * time.Second
...@@ -447,38 +492,3 @@ func (p *Peer) PutConn(c *Conn) { ...@@ -447,38 +492,3 @@ func (p *Peer) PutConn(c *Conn) {
p.linkMu.Unlock() p.linkMu.Unlock()
} }
*/ */
// dial does low-level work to dial peer
// XXX p.* reading without lock - ok?
func (p *Node) dial(ctx context.Context) (*NodeLink, error) {
var me *NodeApp // XXX bad -> crashes
link, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
if err != nil {
return nil, err
}
// verify peer identifies as what we expect
// XXX move to Dial?
switch {
case accept.NodeType != p.Type:
err = fmt.Errorf("connected, but peer is not %v (identifies as %v)", p.Type, accept.NodeType)
case accept.MyUUID != p.UUID:
err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID)
case accept.YourUUID != me.MyInfo.UUID:
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, me.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
}
if err != nil {
//log.Errorif(ctx, link.Close())
lclose(ctx, link)
link = nil
}
return link, err
}
...@@ -44,7 +44,7 @@ import ( ...@@ -44,7 +44,7 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works // Master is a node overseeing and managing how whole NEO cluster works
type Master struct { type Master struct {
node neo.NodeApp node *neo.NodeApp
// master manages node and partition tables and broadcast their updates // master manages node and partition tables and broadcast their updates
// to all nodes in cluster // to all nodes in cluster
...@@ -83,24 +83,8 @@ type nodeLeave struct { ...@@ -83,24 +83,8 @@ type nodeLeave struct {
// NewMaster creates new master node that will listen on serveAddr. // NewMaster creates new master node that will listen on serveAddr.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
// convert serveAddr into neo format
addr, err := neo.AddrString(net.Network(), serveAddr)
if err != nil {
panic(err) // XXX
}
m := &Master{ m := &Master{
node: neo.NodeApp{ node: neo.NewNodeApp(net, neo.MASTER, clusterName, serveAddr, serveAddr),
MyInfo: neo.NodeInfo{Type: neo.MASTER, Addr: addr},
ClusterName: clusterName,
Net: net,
MasterAddr: serveAddr, // XXX ok?
NodeTab: &neo.NodeTable{},
PartTab: &neo.PartitionTable{},
ClusterState: -1, // invalid
},
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
......
...@@ -39,7 +39,7 @@ import ( ...@@ -39,7 +39,7 @@ import (
// Storage is NEO node that keeps data and provides read/write access to it // Storage is NEO node that keeps data and provides read/write access to it
type Storage struct { type Storage struct {
node neo.NodeApp node *neo.NodeApp
// context for providing operational service // context for providing operational service
// it is renewed every time master tells us StartOpertion, so users // it is renewed every time master tells us StartOpertion, so users
...@@ -59,26 +59,10 @@ type Storage struct { ...@@ -59,26 +59,10 @@ type Storage struct {
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr. // NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr.
// The storage uses zstor as underlying backend for storing data. // The storage uses zstor as underlying backend for storing data.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage { func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage {
// convert serveAddr into neo format
// XXX -> new.NewNode() ?
addr, err := neo.AddrString(net.Network(), serveAddr)
if err != nil {
panic(err) // XXX
}
stor := &Storage{ stor := &Storage{
node: neo.NodeApp{ node: neo.NewNodeApp(net, neo.STORAGE, clusterName, masterAddr, serveAddr),
MyInfo: neo.NodeInfo{Type: neo.STORAGE, Addr: addr}, zstor: zstor,
ClusterName: cluster,
Net: net,
MasterAddr: masterAddr,
PartTab: &neo.PartitionTable{}, // empty - TODO read from disk
NodeTab: &neo.NodeTable{},
},
zstor: zstor,
} }
// operational context is initially done (no service should be provided) // operational context is initially done (no service should be provided)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment