Commit 927ada0c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ef7b5996
? Load vs "deleted" -> err or data=nil ? -> stor.Conn flag
? IStorage.Load -> per storage connection
? random access -> mmap
----------------------------------------
- TODO stats for events (packets received, errors, etc) - TODO stats for events (packets received, errors, etc)
- custom memory manager is required to avoid performance hit on e.g. []byte alloc/dealloc - custom memory manager is required to avoid performance hit on e.g. []byte alloc/dealloc
...@@ -18,6 +25,8 @@ ...@@ -18,6 +25,8 @@
- "Go Execution Tracer" - "Go Execution Tracer"
- x/net/trace to trace requests and connection logging (and packets ?) - x/net/trace to trace requests and connection logging (and packets ?)
- packet log; general log -> glog ?
- gRPC eventually instead of hand-made protocol ? - gRPC eventually instead of hand-made protocol ?
go.leveldb - study go.leveldb - study
......
...@@ -27,11 +27,13 @@ import ( ...@@ -27,11 +27,13 @@ import (
"math" "math"
"os" "os"
"sync" "sync"
"time"
) )
// Master is a node overseeing and managing how whole NEO cluster works // Master is a node overseeing and managing how whole NEO cluster works
type Master struct { type Master struct {
clusterName string clusterName string
nodeUUID NodeUUID // my node uuid; XXX init somewhere
// master manages node and partition tables and broadcast their updates // master manages node and partition tables and broadcast their updates
// to all nodes in cluster // to all nodes in cluster
...@@ -40,10 +42,16 @@ type Master struct { ...@@ -40,10 +42,16 @@ type Master struct {
partTab PartitionTable partTab PartitionTable
clusterState ClusterState clusterState ClusterState
//nodeEventQ chan ... // for node connected / disconnected events nodeCome chan nodeCome // node connected
//nodeLeave chan nodeLeave // node disconnected
}
//txnCommittedQ chan ... // TODO for when txn is committed // a new node connect
type nodeCome struct {
link *NodeLink
idReq RequestIdentification // we received this identification request
idResp chan NEOEncoder // what we reply (AcceptIdentification | Error)
} }
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
...@@ -61,37 +69,115 @@ func (m *Master) SetClusterState(state ClusterState) { ...@@ -61,37 +69,115 @@ func (m *Master) SetClusterState(state ClusterState) {
// XXX actions ? // XXX actions ?
} }
// monotime returns time passed since program start
// it uses monothonic time and is robust to OS clock adjustments
// XXX place?
func monotime() float64 {
// time.Sub uses monotonic clock readings for the difference
return time.Now().Sub(tstart).Seconds()
}
var tstart time.Time = time.Now()
// run implements main master cluster management logic: node tracking, cluster // run implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
/*
func (m *Master) run(ctx context.Context) { func (m *Master) run(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
panic("TODO") panic("TODO")
case nodeEvent := <-m.nodeEventQ: // new node connects & requests identification
// TODO update nodeTab case n := <-m.nodeCome:
// XXX also verify ? :
// - NodeType valid
// - IdTimestamp ?
// add info to nodeTab if n.idReq.ClusterName != m.clusterName {
m.nodeTab.Lock() n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX
m.nodeTab.Add(&Node{nodeInfo, link}) break
m.nodeTab.Unlock() }
// TODO notify nodeTab changes nodeType := n.idReq.NodeType
// TODO consider adjusting partTab uuid := n.idReq.NodeUUID
if uuid == 0 {
uuid = m.allocUUID(nodeType)
}
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
node := m.nodeTab.Get(uuid)
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
break
}
// TODO consider how this maybe adjust cluster state // XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case CLIENT:
n.idResp <- &Error{NOT_READY, "cluster not operational"}
// XXX ...
}
//case txnCommitted := <-m.txnCommittedQ:
n.idResp <- &AcceptIdentification{
NodeType: MASTER,
MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
} }
// update nodeTab
var nodeState NodeState
switch nodeType {
case STORAGE:
// FIXME py sets to RUNNING/PENDING depending on cluster state
nodeState = PENDING
default:
nodeState = RUNNING
}
nodeInfo := NodeInfo{
NodeType: nodeType,
Address: n.idReq.Address,
NodeUUID: uuid,
NodeState: nodeState,
IdTimestamp: monotime(),
} }
m.nodeTab.Update(nodeInfo) // NOTE this notifies al nodeTab subscribers
// XXX consider adjusting partTab
// XXX consider .clusterState change
// XXX add new node to current whole-cluster job
//case link := <-m.nodeLeave:
}
}
}
// allocUUID allocates new node uuid for a node of kind nodeType
// XXX it is bad idea for master to assign uuid to coming node
// -> better nodes generate really uniquie UUID themselves and always show with them
func (m *Master) allocUUID(nodeType NodeType) NodeUUID {
// see NodeUUID & NodeUUID.String for details
// XXX better to keep this code near to ^^^ (e.g. attached to NodeType)
// XXX but since whole uuid assign idea is not good - let's keep it dirty here
typ := int(nodeType & 7) << (24 + 4) // note temp=0
for num := 1; num < 1<<24; num++ {
uuid := NodeUUID(typ | num)
if m.nodeTab.Get(uuid) == nil {
return uuid
}
}
panic("all uuid allocated ???") // XXX more robust ?
} }
*/
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
......
...@@ -21,7 +21,7 @@ package neo ...@@ -21,7 +21,7 @@ package neo
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"sync" //"sync"
) )
// NodeTable represents all nodes in a cluster // NodeTable represents all nodes in a cluster
...@@ -70,7 +70,7 @@ import ( ...@@ -70,7 +70,7 @@ import (
// NodeTable zero value is valid empty node table. // NodeTable zero value is valid empty node table.
type NodeTable struct { type NodeTable struct {
// users have to care locking explicitly // users have to care locking explicitly
sync.RWMutex //sync.RWMutex XXX needed ?
nodev []*Node nodev []*Node
notifyv []chan NodeInfo // subscribers notifyv []chan NodeInfo // subscribers
...@@ -84,6 +84,7 @@ type Node struct { ...@@ -84,6 +84,7 @@ type Node struct {
Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ? Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ?
// XXX identified or not ? // XXX identified or not ?
// XXX -> not needed - we only add something to nodetab after identification
} }
...@@ -157,12 +158,26 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func() ...@@ -157,12 +158,26 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()
} }
// UpdateNode updates information about a node // Update updates information about a node
func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) { // XXX how to pass link into node?
// TODO func (nt *NodeTable) Update(nodeInfo NodeInfo) {
node := nt.Get(nodeInfo.NodeUUID)
if node == nil {
node = &Node{}
nt.nodev = append(nt.nodev, node)
}
node.Info = nodeInfo
// notify subscribers
// XXX rlock for .notifyv ?
for _, notify := range nt.notifyv {
notify <- nodeInfo
}
} }
// XXX ? ^^^ UpdateNode is enough ? /*
// XXX ? ^^^ Update is enough ?
func (nt *NodeTable) Add(node *Node) { func (nt *NodeTable) Add(node *Node) {
// XXX check node is already there // XXX check node is already there
// XXX pass/store node by pointer ? // XXX pass/store node by pointer ?
...@@ -170,12 +185,13 @@ func (nt *NodeTable) Add(node *Node) { ...@@ -170,12 +185,13 @@ func (nt *NodeTable) Add(node *Node) {
// TODO notify all nodelink subscribers about new info // TODO notify all nodelink subscribers about new info
} }
*/
// TODO subscribe for changes on Add ? (notification via channel) // TODO subscribe for changes on Add ? (notification via channel)
// Lookup finds node by uuid // Get finds node by uuid
func (nt *NodeTable) Lookup(uuid NodeUUID) *Node { func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan // FIXME linear scan
for _, node := range nt.nodev { for _, node := range nt.nodev {
if node.Info.NodeUUID == uuid { if node.Info.NodeUUID == uuid {
...@@ -185,7 +201,7 @@ func (nt *NodeTable) Lookup(uuid NodeUUID) *Node { ...@@ -185,7 +201,7 @@ func (nt *NodeTable) Lookup(uuid NodeUUID) *Node {
return nil return nil
} }
// XXX LookupByAddress ? // XXX GetByAddress ?
func (nt *NodeTable) String() string { func (nt *NodeTable) String() string {
......
...@@ -136,6 +136,8 @@ type PartitionCell struct { ...@@ -136,6 +136,8 @@ type PartitionCell struct {
// nodes referenced by pt are up and running // nodes referenced by pt are up and running
// //
// XXX or keep not only NodeUUID in PartitionCell - add *Node ? // XXX or keep not only NodeUUID in PartitionCell - add *Node ?
//
// XXX -> add `nt *NodeTable` as argument and check real node states there ?
func (pt *PartitionTable) Operational() bool { func (pt *PartitionTable) Operational() bool {
for _, ptEntry := range pt.ptTab { for _, ptEntry := range pt.ptTab {
if len(ptEntry) == 0 { if len(ptEntry) == 0 {
......
...@@ -237,7 +237,7 @@ func (p *CloseClient) NEODecode(data []byte) (int, error) { ...@@ -237,7 +237,7 @@ func (p *CloseClient) NEODecode(data []byte) (int, error) {
// 7. RequestIdentification // 7. RequestIdentification
func (p *RequestIdentification) NEOEncodedInfo() (uint16, int) { func (p *RequestIdentification) NEOEncodedInfo() (uint16, int) {
return 7, 26 + len(p.Address.Host) + len(p.Name) return 7, 26 + len(p.Address.Host) + len(p.ClusterName)
} }
func (p *RequestIdentification) NEOEncode(data []byte) { func (p *RequestIdentification) NEOEncode(data []byte) {
...@@ -252,10 +252,10 @@ func (p *RequestIdentification) NEOEncode(data []byte) { ...@@ -252,10 +252,10 @@ func (p *RequestIdentification) NEOEncode(data []byte) {
} }
binary.BigEndian.PutUint16(data[0:], p.Address.Port) binary.BigEndian.PutUint16(data[0:], p.Address.Port)
{ {
l := uint32(len(p.Name)) l := uint32(len(p.ClusterName))
binary.BigEndian.PutUint32(data[2:], l) binary.BigEndian.PutUint32(data[2:], l)
data = data[6:] data = data[6:]
copy(data, p.Name) copy(data, p.ClusterName)
data = data[l:] data = data[l:]
} }
float64_NEOEncode(data[0:], p.IdTimestamp) float64_NEOEncode(data[0:], p.IdTimestamp)
...@@ -286,7 +286,7 @@ func (p *RequestIdentification) NEODecode(data []byte) (int, error) { ...@@ -286,7 +286,7 @@ func (p *RequestIdentification) NEODecode(data []byte) (int, error) {
goto overflow goto overflow
} }
nread += 8 + l nread += 8 + l
p.Name = string(data[:l]) p.ClusterName = string(data[:l])
data = data[l:] data = data[l:]
} }
p.IdTimestamp = float64_NEODecode(data[0:]) p.IdTimestamp = float64_NEODecode(data[0:])
......
...@@ -273,7 +273,7 @@ type RequestIdentification struct { ...@@ -273,7 +273,7 @@ type RequestIdentification struct {
NodeType NodeType // XXX name NodeType NodeType // XXX name
NodeUUID NodeUUID NodeUUID NodeUUID
Address Address // where requesting node is also accepting connections Address Address // where requesting node is also accepting connections
Name string // XXX -> ClusterName ClusterName string
IdTimestamp float64 IdTimestamp float64
} }
......
...@@ -183,7 +183,7 @@ func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, e ...@@ -183,7 +183,7 @@ func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, e
NodeType: nodeType, NodeType: nodeType,
NodeUUID: 0, // XXX NodeUUID: 0, // XXX
Address: Address{}, // XXX Address: Address{}, // XXX
Name: "", // XXX cluster name ? ClusterName: "", // XXX
IdTimestamp: 0, // XXX IdTimestamp: 0, // XXX
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment