Commit 3a15289e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 40410799
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
package neo package neo
// partition table // partition table
import "fmt"
// PartitionTable represents object space partitioning in a cluster // PartitionTable represents object space partitioning in a cluster
// //
// It is // It is
...@@ -116,8 +118,7 @@ type PartitionTable struct { ...@@ -116,8 +118,7 @@ type PartitionTable struct {
// PartitionCell describes one storage in a pid entry in partition table // PartitionCell describes one storage in a pid entry in partition table
type PartitionCell struct { type PartitionCell struct {
NodeUUID CellInfo
CellState
// XXX ? + .haveUpToTid associated node has data up to such tid // XXX ? + .haveUpToTid associated node has data up to such tid
// = uptodate if haveUpToTid == lastTid // = uptodate if haveUpToTid == lastTid
...@@ -140,7 +141,8 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable { ...@@ -140,7 +141,8 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable {
for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) { for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) {
node := nodev[j] node := nodev[j]
// XXX assert node.State > DOWN // XXX assert node.State > DOWN
tab[i] = []PartitionCell{{node.UUID, UP_TO_DATE /*XXX ok?*/}} fmt.Printf("tab[%d] <- %v\n", i, node.UUID)
tab[i] = []PartitionCell{{CellInfo: CellInfo{node.UUID, UP_TO_DATE /*XXX ok?*/}}}
} }
return &PartitionTable{tab: tab} return &PartitionTable{tab: tab}
...@@ -195,6 +197,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool { ...@@ -195,6 +197,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
// ---- encode / decode PT to / from messages // ---- encode / decode PT to / from messages
// XXX naming // XXX naming
// XXX -> RowList() ?
func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struct ? func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struct ?
rowv := make([]RowInfo, len(pt.tab)) rowv := make([]RowInfo, len(pt.tab))
for i, row := range pt.tab { for i, row := range pt.tab {
...@@ -220,10 +223,7 @@ func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable { ...@@ -220,10 +223,7 @@ func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable {
//pt.tab[i] = append(pt.tab[i], row.CellList...) //pt.tab[i] = append(pt.tab[i], row.CellList...)
for _, cell := range row.CellList { for _, cell := range row.CellList {
pt.tab[i] = append(pt.tab[i], PartitionCell{ pt.tab[i] = append(pt.tab[i], PartitionCell{cell})
NodeUUID: cell.NodeUUID,
CellState: cell.CellState,
})
} }
} }
......
...@@ -221,7 +221,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -221,7 +221,7 @@ func TestMasterStorage(t *testing.T) {
gwg.Gox(func() { gwg.Gox(func() {
err := M.Run(Mctx) err := M.Run(Mctx)
fmt.Println("M err: ", err) fmt.Println("M err: ", err)
_ = err // XXX exc.Raiseif(err)
}) })
// M starts listening // M starts listening
...@@ -238,7 +238,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -238,7 +238,7 @@ func TestMasterStorage(t *testing.T) {
gwg.Gox(func() { gwg.Gox(func() {
err := S.Run(Sctx) err := S.Run(Sctx)
fmt.Println("S err: ", err) fmt.Println("S err: ", err)
_ = err // XXX exc.Raiseif(err)
}) })
// S starts listening // S starts listening
...@@ -295,16 +295,19 @@ func TestMasterStorage(t *testing.T) { ...@@ -295,16 +295,19 @@ func TestMasterStorage(t *testing.T) {
xwait(wg) xwait(wg)
// XXX M.partTab <- S1 // XXX M.partTab <- S1
// XXX M can start -> writes parttab to S and goes to verification
// XXX M.partTab <- ... // verification
tc.Expect(clusterState(&M.clusterState, neo.ClusterVerifying))
tc.Expect(conntx("m:2", "s:2", 1, &neo.NotifyPartitionTable{
PTid: 1,
RowList: []neo.RowInfo{
{0, []neo.CellInfo{{S.node.MyInfo.UUID, neo.UP_TO_DATE}}},
},
}))
// XXX temp
return
// expect: // expect:
// M.clusterState <- VERIFICATION + TODO it should be sent to S // ? M -> S ClusterInformation(VERIFICATION)
// M -> S .? LockedTransactions{} // M -> S .? LockedTransactions{}
// M <- S .? AnswerLockedTransactions{...} // M <- S .? AnswerLockedTransactions{...}
// M -> S .? LastIDs{} // M -> S .? LastIDs{}
...@@ -334,9 +337,10 @@ func TestMasterStorage(t *testing.T) { ...@@ -334,9 +337,10 @@ func TestMasterStorage(t *testing.T) {
// TODO test M.recovery starting back from verification/service // TODO test M.recovery starting back from verification/service
// (M needs to resend to all storages recovery messages just from start) // (M needs to resend to all storages recovery messages just from start)
return
Mcancel() // FIXME ctx cancel not fully handled
Scancel() // ---- // ----
xwait(gwg) xwait(gwg)
Mcancel() // XXX temp
Scancel() // XXX temp
} }
// basic interaction between Client -- Storage // basic interaction between Client -- Storage
......
...@@ -458,6 +458,7 @@ loop2: ...@@ -458,6 +458,7 @@ loop2:
// recovery successful - we are starting // recovery successful - we are starting
// S PENDING -> RUNNING // S PENDING -> RUNNING
// XXX recheck logic is ok for starting existing cluster
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.State == neo.PENDING { if stor.State == neo.PENDING {
m.nodeTab.SetNodeState(stor, neo.RUNNING) m.nodeTab.SetNodeState(stor, neo.RUNNING)
...@@ -465,7 +466,8 @@ loop2: ...@@ -465,7 +466,8 @@ loop2:
} }
// if we are starting for new cluster - create partition table // if we are starting for new cluster - create partition table
if err != nil && m.partTab.PTid == 0 { if m.partTab.PTid == 0 {
log.Infof(ctx, "creating new partition table")
// XXX -> m.nodeTab.StorageList(State > DOWN) // XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*neo.Node{} storv := []*neo.Node{}
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
...@@ -567,7 +569,7 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -567,7 +569,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
go storCtlVerify(vctx, stor, verify) go storCtlVerify(vctx, stor, m.partTab, verify)
} }
} }
...@@ -585,7 +587,7 @@ loop: ...@@ -585,7 +587,7 @@ loop:
// new storage arrived - start verification on it too // new storage arrived - start verification on it too
// XXX ok? or it must first go through recovery check? // XXX ok? or it must first go through recovery check?
inprogress++ inprogress++
go storCtlVerify(vctx, node, verify) go storCtlVerify(vctx, node, m.partTab, verify)
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.SetNodeState(n.node, neo.DOWN) m.nodeTab.SetNodeState(n.node, neo.DOWN)
...@@ -661,7 +663,7 @@ type storVerify struct { ...@@ -661,7 +663,7 @@ type storVerify struct {
} }
// storCtlVerify drives a storage node during cluster verifying (= starting) state // storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, stor *neo.Node, res chan storVerify) { func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable, res chan storVerify) {
// XXX link.Close on err // XXX link.Close on err
// XXX cancel on ctx // XXX cancel on ctx
...@@ -673,10 +675,16 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, res chan storVerify) { ...@@ -673,10 +675,16 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, res chan storVerify) {
}() }()
defer runningf(&ctx, "%s: stor verify", stor.Link)(&err) defer runningf(&ctx, "%s: stor verify", stor.Link)(&err)
// FIXME stub conn := stor.Conn
conn, _ := stor.Link.NewConn()
// XXX NotifyPT (so storages save locally recovered PT) // send just recovered parttab so storage saves it
err = conn.Send(&neo.NotifyPartitionTable{
PTid: pt.PTid,
RowList: pt.Dump(),
})
if err != nil {
return
}
locked := neo.AnswerLockedTransactions{} locked := neo.AnswerLockedTransactions{}
err = conn.Ask(&neo.LockedTransactions{}, &locked) err = conn.Ask(&neo.LockedTransactions{}, &locked)
......
...@@ -59,7 +59,7 @@ func _running(ctxp *context.Context, name string) func(*error) { ...@@ -59,7 +59,7 @@ func _running(ctxp *context.Context, name string) func(*error) {
// XXX or we still want to log all errors - right? // XXX or we still want to log all errors - right?
log.Depth(1).Error(ctx, "## ", *errp) // XXX "::" temp log.Depth(1).Error(ctx, "## ", *errp) // XXX "::" temp
} else { } else {
log.Depth(1).Info(ctx, "ok") log.Depth(1).Info(ctx, "done")
} }
// XXX do we need vvv if we log it anyway ^^^ ? // XXX do we need vvv if we log it anyway ^^^ ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment