Commit 7f7169ce authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 30a3b811
......@@ -35,6 +35,7 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -208,7 +209,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
case neo.STORAGE:
fallthrough
default:
l.CloseAccept()
conn.Link().CloseAccept()
}
// handover to main driver
......@@ -802,7 +803,8 @@ func (m *Master) service(ctx context.Context) (err error) {
wg.Add(1)
go func() {
defer wg.Done()
storCtlService(ctx, stor, serviced)
err := storCtlService(ctx, stor)
serviced <- serviceDone{node: stor, err: err}
}()
}
}
......@@ -831,13 +833,15 @@ loop:
switch node.Type {
case neo.STORAGE:
storCtlService(ctx, node, serviced)
err = storCtlService(ctx, node)
//case neo.CLIENT:
// serveClient(ctx, node, serviced)
case neo.CLIENT:
err = m.serveClient(ctx, node)
// XXX ADMIN
}
serviced <- serviceDone{node: node, err: err}
}()
case d := <-serviced:
......@@ -872,16 +876,14 @@ loop:
}
}
// XXX wait all spawned service workers
return err
}
// storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *neo.Node, done chan serviceDone) {
err := storCtlService1(ctx, stor)
done <- serviceDone{node: stor, err: err}
}
func storCtlService1(ctx context.Context, stor *neo.Node) (err error) {
func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
defer task.Runningf(&ctx, "%s: stor service", stor.Link.RemoteAddr())(&err)
conn := stor.Conn
......@@ -911,6 +913,44 @@ func storCtlService1(ctx context.Context, stor *neo.Node) (err error) {
}
}
// serveClient serves incoming client link
func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
defer task.Runningf(&ctx, "%s: client service", cli.Link.RemoteAddr())(&err)
clink := cli.Link
defer xio.CloseWhenDone(ctx, clink)()
// XXX spawn M -> S notifications about cluster state
for {
req, err := clink.Recv1()
if err != nil {
return err
}
resp := m.serveClient1(ctx, req.Msg)
err = req.Reply(resp)
if err != nil {
return err
}
}
return nil
}
// serveClient1 prepares response for 1 request from client
func (m *Master) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
switch req := req.(type) {
case *neo.AskPartitionTable:
// XXX
panic("TODO")
default:
return &neo.Error{neo.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req)}
}
}
// ----------------------------------------
// identify processes identification request of just connected node and either accepts or declines it.
......
......@@ -608,6 +608,4 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
}
//req.Put(...)
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment