Commit 6b35dbe6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent affb271c
...@@ -42,18 +42,31 @@ type Master struct { ...@@ -42,18 +42,31 @@ type Master struct {
partTab PartitionTable partTab PartitionTable
clusterState ClusterState clusterState ClusterState
nodeCome chan nodeCome // node connected // channels from various workers to main driver
//nodeLeave chan nodeLeave // node disconnected nodeCome chan nodeCome // node connected
nodeLeave chan nodeLeave // node disconnected
storRecovery chan storRecovery // storage node passed recovery
} }
// a new node connect // node connects
type nodeCome struct { type nodeCome struct {
link *NodeLink link *NodeLink
idReq RequestIdentification // we received this identification request idReq RequestIdentification // we received this identification request
idResp chan NEOEncoder // what we reply (AcceptIdentification | Error) idResp chan NEOEncoder // what we reply (AcceptIdentification | Error)
} }
// node disconnects
type nodeLeave struct {
// TODO
}
// storage node passed recovery phase
type storRecovery struct {
parttab PartitionTable
// XXX + lastOid, lastTid, backup_tid, truncate_tid ?
}
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
m := &Master{clusterName: clusterName} m := &Master{clusterName: clusterName}
m.SetClusterState(RECOVERING) // XXX no elections - we are the only master m.SetClusterState(RECOVERING) // XXX no elections - we are the only master
...@@ -85,7 +98,7 @@ func (m *Master) run(ctx context.Context) { ...@@ -85,7 +98,7 @@ func (m *Master) run(ctx context.Context) {
// current function to ask/control a storage depending on current cluster state and master idea // current function to ask/control a storage depending on current cluster state and master idea
// + associated context covering all storage nodes // + associated context covering all storage nodes
storCtl := m.storRecovery storCtl := m.storCtlRecovery
storCtlCtx, storCtlCancel := context.WithCancel(ctx) storCtlCtx, storCtlCancel := context.WithCancel(ctx)
for { for {
...@@ -115,42 +128,16 @@ func (m *Master) run(ctx context.Context) { ...@@ -115,42 +128,16 @@ func (m *Master) run(ctx context.Context) {
// node disconnects // node disconnects
//case link := <-m.nodeLeave: //case link := <-m.nodeLeave:
}
}
_ = storCtlCancel // XXX // a storage node came through recovery - let's see whether
} // ptid ↑ and if so we should take partition table from there
case r := <-m.storRecovery:
_ = r
// storRecovery drives a storage node during cluster recoving state
// TODO text
func (m *Master) storRecovery(ctx context.Context, link *NodeLink) {
var err error
defer func() {
if err == nil {
return
} }
fmt.Printf("master: %v", err)
// this must interrupt everything connected to stor node and
// thus eventually to result in nodeLeave event to main driver
link.Close()
}()
defer errcontextf(&err, "%s: stor recovery", link)
conn, err := link.NewConn() // FIXME bad
if err != nil {
return
} }
recovery := AnswerRecovery{} _ = storCtlCancel // XXX
err = Ask(conn, &Recovery{}, &recovery)
if err != nil {
return
}
ptid := recovery.PTid
_ = ptid // XXX temp
} }
// accept processes identification request of just connected node and either accepts or declines it // accept processes identification request of just connected node and either accepts or declines it
...@@ -222,6 +209,61 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) { ...@@ -222,6 +209,61 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
return nodeInfo, true return nodeInfo, true
} }
// storCtlRecovery drives a storage node during cluster recoving state
// TODO text
func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
var err error
defer func() {
if err == nil {
return
}
fmt.Printf("master: %v", err)
// this must interrupt everything connected to stor node and
// thus eventually result in nodeLeave event to main driver
link.Close()
}()
defer errcontextf(&err, "%s: stor recovery", link)
conn, err := link.NewConn() // FIXME bad
if err != nil {
return
}
recovery := AnswerRecovery{}
err = Ask(conn, &Recovery{}, &recovery)
if err != nil {
return
}
resp := AnswerPartitionTable{}
err = Ask(conn, &X_PartitionTable{}, &resp)
if err != nil {
return
}
// reconstruct partition table from response
pt := PartitionTable{}
pt.ptId = resp.PTid
for _, row := range resp.RowList {
i := row.Offset
for i >= uint32(len(pt.ptTab)) {
pt.ptTab = append(pt.ptTab, []PartitionCell{})
}
//pt.ptTab[i] = append(pt.ptTab[i], row.CellList...)
for _, cell := range row.CellList {
pt.ptTab[i] = append(pt.ptTab[i], PartitionCell{
NodeUUID: cell.NodeUUID,
CellState: cell.CellState,
})
}
}
m.storRecovery <- storRecovery{parttab: pt}
}
// allocUUID allocates new node uuid for a node of kind nodeType // allocUUID allocates new node uuid for a node of kind nodeType
// XXX it is bad idea for master to assign uuid to coming node // XXX it is bad idea for master to assign uuid to coming node
// -> better nodes generate really uniquie UUID themselves and always show with them // -> better nodes generate really uniquie UUID themselves and always show with them
......
...@@ -109,7 +109,7 @@ type PartitionTable struct { ...@@ -109,7 +109,7 @@ type PartitionTable struct {
ptTab [][]PartitionCell // [#Np] ptTab [][]PartitionCell // [#Np]
ptId int // ↑ for versioning XXX -> ver ? ptId PTid // ↑ for versioning XXX -> ver ?
} }
// PartitionCell describes one storage in a ptid entry in partition table // PartitionCell describes one storage in a ptid entry in partition table
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment