Commit 72ee34f1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bdf59e9e
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
- x/net/trace to trace requests and connection logging (and packets ?) - x/net/trace to trace requests and connection logging (and packets ?)
- packet log; general log -> glog ? - packet log; general log -> glog ?
-> http://opentracing.io/
- gRPC eventually instead of hand-made protocol ? - gRPC eventually instead of hand-made protocol ?
go.leveldb - study go.leveldb - study
......
...@@ -31,15 +31,7 @@ import ( ...@@ -31,15 +31,7 @@ import (
// Client talks to NEO cluster and exposes access it via ZODB interfaces // Client talks to NEO cluster and exposes access it via ZODB interfaces
type Client struct { type Client struct {
// XXX move -> nodeCommon? neo.NodeCommon
// ---- 8< ----
myInfo neo.NodeInfo // XXX -> only NodeUUID
clusterName string
net xnet.Networker // network AP we are sending/receiving on
masterAddr string // address of master XXX -> Address ?
// ---- 8< ----
storLink *neo.NodeLink // link to storage node storLink *neo.NodeLink // link to storage node
storConn *neo.Conn // XXX main connection to storage storConn *neo.Conn // XXX main connection to storage
......
...@@ -254,7 +254,7 @@ func (c *Conn) shutdown() { ...@@ -254,7 +254,7 @@ func (c *Conn) shutdown() {
}) })
} }
// Close closes connection // Close closes connection.
// Any blocked Send*() or Recv*() will be unblocked and return error // Any blocked Send*() or Recv*() will be unblocked and return error
// //
// NOTE for Send() - once transmission was started - it will complete in the // NOTE for Send() - once transmission was started - it will complete in the
......
...@@ -39,3 +39,51 @@ const ( ...@@ -39,3 +39,51 @@ const (
// OID_LEN = 8 // OID_LEN = 8
// TID_LEN = 8 // TID_LEN = 8
) )
// NodeCommon is common data in all NEO nodes: Master, Storage & Client XXX text
// XXX naming -> Node ?
type NodeCommon struct {
MyInfo neo.NodeInfo // XXX -> only NodeUUID
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of master XXX -> Address ?
// XXX + NodeTab (information about nodes in the cluster) ?
// XXX + PartTab (information about data distribution in the cluster) ?
}
// Listen starts listening at node's listening address.
// If the address is empty one new free is automatically selected.
// The node information about where it listens at is appropriately updated.
func (n *NodeCommon) Listen() (net.Listener, error) {
// start listening
l, err := n.Net.Listen(n.MyInfo.Address.String()) // XXX ugly
if err != nil {
return nil, err // XXX err ctx
}
// now we know our listening address (in case it was autobind before)
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := neo.Addr(l.Addr())
if err != nil {
// XXX -> panic here ?
l.Close()
return nil, err // XXX err ctx
}
n.MyInfo.Address = addr
return l, nil
}
// XXX func (n *Node) IdentifyWith(...) ?
// XXX better -> Connect() (=Dial, IdentifyWith, process common ID reply ...)
// TODO functions to update:
// .PartTab from NotifyPartitionTable msg
// .NodeTab from NotifyNodeInformation msg
// .ClusterState from NotifyClusterState msg
...@@ -82,6 +82,7 @@ type NodeTable struct { ...@@ -82,6 +82,7 @@ type NodeTable struct {
} }
// Node represents a node entry in NodeTable // Node represents a node entry in NodeTable
// XXX naming -> NodeEntry?
type Node struct { type Node struct {
NodeInfo // XXX good idea to embed ? NodeInfo // XXX good idea to embed ?
......
Storage layout
--------------
meta/
data/
1 inbox/ (commit queues)
2 ? (data.fs)
3. packed/
Commit without deadlocks & master Commit without deadlocks & master
--------------------------------- ---------------------------------
......
...@@ -36,14 +36,7 @@ import ( ...@@ -36,14 +36,7 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works // Master is a node overseeing and managing how whole NEO cluster works
type Master struct { type Master struct {
// XXX move -> nodeCommon? node neo.NodeCommon
// ---- 8< ----
myInfo neo.NodeInfo
clusterName string
net xnet.Networker // network AP we are sending/receiving on
masterAddr string // address of current primary master
// ---- 8< ----
// last allocated oid & tid // last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ? // XXX how to start allocating oid from 0, not 1 ?
...@@ -115,24 +108,12 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { ...@@ -115,24 +108,12 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
// Run starts master node and runs it until ctx is cancelled or fatal error // Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error { func (m *Master) Run(ctx context.Context) error {
// XXX dup wrt Storage.Run
// start listening // start listening
l, err := m.net.Listen(m.myInfo.Address.String()) // XXX ugly l, err := m.Listen()
if err != nil {
return err // XXX err ctx
}
// now we know our listening address (in case it was autobind before)
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := neo.Addr(l.Addr())
if err != nil { if err != nil {
// XXX -> panic here ?
return err // XXX err ctx return err // XXX err ctx
} }
m.myInfo.Address = addr
m.masterAddr = l.Addr().String() m.masterAddr = l.Addr().String()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
......
...@@ -38,14 +38,7 @@ import ( ...@@ -38,14 +38,7 @@ import (
// Storage is NEO storage server application // Storage is NEO storage server application
type Storage struct { type Storage struct {
// XXX move -> nodeCommon? neo.NodeCommon
// ---- 8< ----
myInfo neo.NodeInfo // XXX -> only Address + NodeUUID ?
clusterName string
net xnet.Networker // network AP we are sending/receiving on
masterAddr string // address of master
// ---- 8< ----
// context for providing operational service // context for providing operational service
// it is renewed every time master tells us StartOpertion, so users // it is renewed every time master tells us StartOpertion, so users
...@@ -53,10 +46,16 @@ type Storage struct { ...@@ -53,10 +46,16 @@ type Storage struct {
opMu sync.Mutex opMu sync.Mutex
opCtx context.Context opCtx context.Context
zstor zodb.IStorage // underlying ZODB storage XXX temp ? // TODO storage layout:
// meta/
// data/
// 1 inbox/ (commit queues)
// 2 ? (data.fs)
// 3. packed/
zstor zodb.IStorage // underlying ZODB storage XXX -> directly work with fs1 & friends
} }
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr // NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr.
// The storage uses zstor as underlying backend for storing data. // The storage uses zstor as underlying backend for storing data.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage { func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage {
...@@ -76,8 +75,8 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor ...@@ -76,8 +75,8 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
// operational context is initially done (no service should be provided) // operational context is initially done (no service should be provided)
noOpCtx, cancel := context.WithCancel(context.Background()) noOpCtx, cancel := context.WithCancel(context.Background())
stor.opCtx = noOpCtx
cancel() cancel()
stor.opCtx = noOpCtx
return stor return stor
} }
...@@ -86,25 +85,12 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor ...@@ -86,25 +85,12 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
// Run starts storage node and runs it until either ctx is cancelled or master // Run starts storage node and runs it until either ctx is cancelled or master
// commands it to shutdown. // commands it to shutdown.
func (stor *Storage) Run(ctx context.Context) error { func (stor *Storage) Run(ctx context.Context) error {
// XXX dup wrt Master.Run
// start listening // start listening
l, err := stor.net.Listen(stor.myInfo.Address.String()) // XXX ugly l, err := stor.Listen()
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
// now we know our listening address (in case it was autobind before)
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := neo.Addr(l.Addr())
if err != nil {
// XXX -> panic here ?
return err // XXX err ctx
}
stor.myInfo.Address = addr
// start serving incoming connections // start serving incoming connections
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx) serveCtx, serveCancel := context.WithCancel(ctx)
...@@ -117,6 +103,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -117,6 +103,7 @@ func (stor *Storage) Run(ctx context.Context) error {
// connect to master and get commands and updates from it // connect to master and get commands and updates from it
err = stor.talkMaster(ctx) err = stor.talkMaster(ctx)
// XXX log err?
// we are done - shutdown // we are done - shutdown
serveCancel() serveCancel()
...@@ -125,10 +112,12 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -125,10 +112,12 @@ func (stor *Storage) Run(ctx context.Context) error {
return err // XXX err ctx return err // XXX err ctx
} }
// talkMaster connects to master, announces self and receives commands and notifications // --- channel with master directing us ---
// talkMaster connects to master, announces self and receives commands and notifications.
// it tries to persist master link reconnecting as needed // it tries to persist master link reconnecting as needed
// //
// it always returns an error - either due to cancel or commannd from master to shutdown // it always returns an error - either due to cancel or command from master to shutdown
func (stor *Storage) talkMaster(ctx context.Context) error { func (stor *Storage) talkMaster(ctx context.Context) error {
// XXX errctx // XXX errctx
...@@ -138,9 +127,8 @@ func (stor *Storage) talkMaster(ctx context.Context) error { ...@@ -138,9 +127,8 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err) fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err)
// TODO if err = shutdown -> return // TODO if err = shutdown -> return
// XXX handle shutdown command from master
// throttle reconnecting / exit on cancel // exit on cancel / throttle reconnecting
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
...@@ -187,6 +175,17 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -187,6 +175,17 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// now handle notifications and commands from master // now handle notifications and commands from master
var Mconn *neo.Conn var Mconn *neo.Conn
for { for {
// check if it was context cancel or command from master to shutdown
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err.IsShutdown(...) { // TODO
return err
}
// accept next connection from master. only 1 connection is served at any given time // accept next connection from master. only 1 connection is served at any given time
// XXX every new connection from master means previous connection was closed // XXX every new connection from master means previous connection was closed
// XXX how to do so and stay compatible to py? // XXX how to do so and stay compatible to py?
...@@ -203,16 +202,18 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -203,16 +202,18 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return err // XXX ? return err // XXX ?
} }
// XXX close Mconn on ctx cancel so m1initialize or m1serve wake up
// let master initialize us. If successful this ends with StartOperation command.
err = stor.m1initialize(ctx, Mconn) err = stor.m1initialize(ctx, Mconn)
if err != nil { if err != nil {
fmt.Println("stor: %v: master: %v", err) fmt.Println("stor: %v: master: %v", err)
// XXX recheck closing Mconn
continue // retry initializing continue // retry initializing
} }
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, Mconn) err = stor.m1serve(ctx, Mconn)
fmt.Println("stor: %v: master: %v", err) fmt.Println("stor: %v: master: %v", err)
// XXX check if it was command to shutdown and if so break
continue // retry from initializing continue // retry from initializing
} }
...@@ -234,7 +235,6 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -234,7 +235,6 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
defer xerr.Context(&err, "init") defer xerr.Context(&err, "init")
for { for {
// XXX abort on ctx (XXX or upper?)
msg, err := Mconn.Recv() msg, err := Mconn.Recv()
if err != nil { if err != nil {
return err return err
...@@ -267,7 +267,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -267,7 +267,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
lastTid, zerr1 := stor.zstor.LastTid() lastTid, zerr1 := stor.zstor.LastTid()
lastOid, zerr2 := stor.zstor.LastOid() lastOid, zerr2 := stor.zstor.LastOid()
if zerr := xerr.First(zerr1, zerr2); zerr != nil { if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr return zerr // XXX send the error to M
} }
err = Mconn.Send(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid}) err = Mconn.Send(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
...@@ -291,7 +291,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -291,7 +291,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
} }
} }
// m1serve drives storage by master messages during service hase // m1serve drives storage by master messages during service phase
// //
// it always returns with an error describing why serve has to be stopped - // it always returns with an error describing why serve has to be stopped -
// either due to master commanding us to stop, or context cancel or some other // either due to master commanding us to stop, or context cancel or some other
...@@ -300,7 +300,7 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { ...@@ -300,7 +300,7 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
defer xerr.Context(&err, "serve") defer xerr.Context(&err, "serve")
// refresh stor.opCtx and cancel it when we finish so that client // refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating // handlers know they need to stop operating as master told us to do so.
opCtx, opCancel := context.WithCancel(ctx) opCtx, opCancel := context.WithCancel(ctx)
stor.opMu.Lock() stor.opMu.Lock()
stor.opCtx = opCtx stor.opCtx = opCtx
...@@ -332,6 +332,8 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { ...@@ -332,6 +332,8 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
} }
} }
// --- serve incoming connections from other nodes ---
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment