Commit affb271c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 927ada0c
...@@ -48,27 +48,12 @@ func (c *Client) Close() error { ...@@ -48,27 +48,12 @@ func (c *Client) Close() error {
func (c *Client) LastTid() (zodb.Tid, error) { func (c *Client) LastTid() (zodb.Tid, error) {
// FIXME do not use global conn (see comment in openClientByURL) // FIXME do not use global conn (see comment in openClientByURL)
// XXX open new conn for this particular req/reply ? // XXX open new conn for this particular req/reply ?
err := EncodeAndSend(c.storConn, &LastTransaction{}) reply := AnswerLastTransaction{}
err := Ask(c.storConn, &LastTransaction{}, &reply)
if err != nil { if err != nil {
return 0, err // XXX err context return 0, err // XXX err ctx
}
reply, err := RecvAndDecode(c.storConn)
if err != nil {
// XXX err context (e.g. peer resetting connection -> currently only EOF)
return 0, err
}
switch reply := reply.(type) {
case *Error:
return 0, errDecode(reply) // XXX err context
default:
// XXX more error context ?
return 0, fmt.Errorf("protocol error: unexpected reply: %T", reply)
case *AnswerLastTransaction:
return reply.Tid, nil
} }
return reply.Tid, nil
} }
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
...@@ -82,36 +67,18 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -82,36 +67,18 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
req.Tid = INVALID_TID req.Tid = INVALID_TID
} }
err = EncodeAndSend(c.storConn, &req) resp := AnswerGetObject{}
err = Ask(c.storConn, &req, &resp)
if err != nil { if err != nil {
return nil, 0, err // XXX err context return nil, 0, err // XXX err context
} }
reply, err := RecvAndDecode(c.storConn) // TODO reply.Checksum - check sha1
if err != nil { // TODO reply.Compression - decompress
// XXX err context (e.g. peer resetting connection -> currently only EOF)
return nil, 0, err
}
switch reply := reply.(type) {
case *Error:
return nil, 0, errDecode(reply) // XXX err context
default:
// XXX more error context ?
return nil, 0, fmt.Errorf("protocol error: unexpected reply: %T", reply)
case *AnswerGetObject:
data = reply.Data
tid = reply.Serial
// TODO reply.Checksum - check sha1 // reply.NextSerial
// TODO reply.Compression - decompress // reply.DataSerial
return resp.Data, resp.Serial, nil
// reply.NextSerial
// reply.DataSerial
return data, tid, nil
}
} }
func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
......
...@@ -235,6 +235,7 @@ func (nl *NodeLink) shutdown() { ...@@ -235,6 +235,7 @@ func (nl *NodeLink) shutdown() {
// All blocking operations - Accept and IO on associated connections // All blocking operations - Accept and IO on associated connections
// established over node link - are automatically interrupted with an error. // established over node link - are automatically interrupted with an error.
// Underlying raw connection is closed. // Underlying raw connection is closed.
// It is safe to call Close several times
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closed, 1) atomic.StoreUint32(&nl.closed, 1)
nl.shutdown() nl.shutdown()
...@@ -255,6 +256,8 @@ func (c *Conn) shutdown() { ...@@ -255,6 +256,8 @@ func (c *Conn) shutdown() {
// NOTE for Send() - once transmission was started - it will complete in the // NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break node-node link framing. // background on the wire not to break node-node link framing.
// //
// It is safe to call Close several times.
//
// TODO Close on one end must make Recv/Send on another end fail // TODO Close on one end must make Recv/Send on another end fail
// (UC: sending []txn-info) // (UC: sending []txn-info)
func (c *Conn) Close() error { func (c *Conn) Close() error {
......
...@@ -82,83 +82,144 @@ var tstart time.Time = time.Now() ...@@ -82,83 +82,144 @@ var tstart time.Time = time.Now()
// run implements main master cluster management logic: node tracking, cluster // run implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) { func (m *Master) run(ctx context.Context) {
// current function to ask/control a storage depending on current cluster state and master idea
// + associated context covering all storage nodes
storCtl := m.storRecovery
storCtlCtx, storCtlCancel := context.WithCancel(ctx)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
panic("TODO") panic("TODO")
// new node connects & requests identification // node connects & requests identification
case n := <-m.nodeCome: case n := <-m.nodeCome:
// XXX also verify ? : nodeInfo, ok := m.accept(n)
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName { if !(ok && nodeInfo.NodeType == STORAGE) {
n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX
break break
} }
nodeType := n.idReq.NodeType // new storage node joined cluster
switch m.clusterState {
uuid := n.idReq.NodeUUID case RECOVERING:
if uuid == 0 {
uuid = m.allocUUID(nodeType)
} }
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
node := m.nodeTab.Get(uuid) // XXX consider .clusterState change
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
break
}
// XXX accept only certain kind of nodes depending on .clusterState, e.g. // launch current storage control work on the new node
switch nodeType { go storCtl(storCtlCtx, n.link)
case CLIENT:
n.idResp <- &Error{NOT_READY, "cluster not operational"}
// XXX ... // TODO consider adjusting partTab
}
// node disconnects
//case link := <-m.nodeLeave:
}
}
n.idResp <- &AcceptIdentification{ _ = storCtlCancel // XXX
NodeType: MASTER, }
MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
}
// update nodeTab // storRecovery drives a storage node during cluster recoving state
var nodeState NodeState // TODO text
switch nodeType { func (m *Master) storRecovery(ctx context.Context, link *NodeLink) {
case STORAGE: var err error
// FIXME py sets to RUNNING/PENDING depending on cluster state defer func() {
nodeState = PENDING if err == nil {
return
}
default: fmt.Printf("master: %v", err)
nodeState = RUNNING
}
nodeInfo := NodeInfo{ // this must interrupt everything connected to stor node and
NodeType: nodeType, // thus eventually to result in nodeLeave event to main driver
Address: n.idReq.Address, link.Close()
NodeUUID: uuid, }()
NodeState: nodeState, defer errcontextf(&err, "%s: stor recovery", link)
IdTimestamp: monotime(),
}
m.nodeTab.Update(nodeInfo) // NOTE this notifies al nodeTab subscribers conn, err := link.NewConn() // FIXME bad
if err != nil {
return
}
// XXX consider adjusting partTab recovery := AnswerRecovery{}
// XXX consider .clusterState change err = Ask(conn, &Recovery{}, &recovery)
// XXX add new node to current whole-cluster job if err != nil {
return
}
//case link := <-m.nodeLeave: ptid := recovery.PTid
_ = ptid // XXX temp
}
// accept processes identification request of just connected node and either accepts or declines it
// if node identification is accepted nodeTab is updated and corresponding nodeInfo is returned
func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
// XXX also verify ? :
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName {
n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return
}
nodeType := n.idReq.NodeType
uuid := n.idReq.NodeUUID
if uuid == 0 {
uuid = m.allocUUID(nodeType)
}
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
node := m.nodeTab.Get(uuid)
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
return
}
// XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case CLIENT:
n.idResp <- &Error{NOT_READY, "cluster not operational"}
// XXX ...
}
n.idResp <- &AcceptIdentification{
NodeType: MASTER,
MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
} }
// update nodeTab
var nodeState NodeState
switch nodeType {
case STORAGE:
// FIXME py sets to RUNNING/PENDING depending on cluster state
nodeState = PENDING
default:
nodeState = RUNNING
}
nodeInfo = NodeInfo{
NodeType: nodeType,
Address: n.idReq.Address,
NodeUUID: uuid,
NodeState: nodeState,
IdTimestamp: monotime(),
} }
m.nodeTab.Update(nodeInfo) // NOTE this notifies al nodeTab subscribers
return nodeInfo, true
} }
// allocUUID allocates new node uuid for a node of kind nodeType // allocUUID allocates new node uuid for a node of kind nodeType
......
...@@ -159,17 +159,9 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi ...@@ -159,17 +159,9 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, err error) { func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, err error) {
defer errcontextf(&err, "%s: request identification", link) defer errcontextf(&err, "%s: request identification", link)
/*
defer func() {
if err != nil {
err = fmt.Errorf("%s: request identification: %s", link, err)
}
}()
*/
conn, err := link.NewConn() conn, err := link.NewConn()
if err != nil { if err != nil {
return peerType, err return 0, err
} }
defer func() { defer func() {
err2 := conn.Close() err2 := conn.Close()
...@@ -179,33 +171,20 @@ func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, e ...@@ -179,33 +171,20 @@ func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, e
} }
}() }()
err = EncodeAndSend(conn, &RequestIdentification{ resp := AcceptIdentification{}
err = Ask(conn, &RequestIdentification{
NodeType: nodeType, NodeType: nodeType,
NodeUUID: 0, // XXX NodeUUID: 0, // XXX
Address: Address{}, // XXX Address: Address{}, // XXX
ClusterName: "", // XXX ClusterName: "", // XXX
IdTimestamp: 0, // XXX IdTimestamp: 0, // XXX
}) }, &resp)
if err != nil { if err != nil {
return peerType, err return 0, err
}
pkt, err := RecvAndDecode(conn)
if err != nil {
return peerType, err
}
switch pkt := pkt.(type) {
default:
return peerType, fmt.Errorf("unexpected answer: %T", pkt)
// XXX also handle Error
case *AcceptIdentification:
return pkt.NodeType, nil
} }
return resp.NodeType, nil
} }
// ---------------------------------------- // ----------------------------------------
...@@ -238,7 +217,7 @@ func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interf ...@@ -238,7 +217,7 @@ func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interf
return pktObj, nil return pktObj, nil
} }
// EncodeAndSend encodes pkt and send it to conn // EncodeAndSend encodes pkt and sends it to conn
func EncodeAndSend(conn *Conn, pkt NEOEncoder) error { func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
msgCode, l := pkt.NEOEncodedInfo() msgCode, l := pkt.NEOEncodedInfo()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
...@@ -252,3 +231,47 @@ func EncodeAndSend(conn *Conn, pkt NEOEncoder) error { ...@@ -252,3 +231,47 @@ func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
return conn.Send(&buf) // XXX why pointer? return conn.Send(&buf) // XXX why pointer?
} }
// Ask does simple request/response protocol exchange
// It expects the answer to be exactly of resp type and errors otherwise
func Ask(conn *Conn, req NEOEncoder, resp NEODecoder) error {
err := EncodeAndSend(conn, req)
if err != nil {
return err
}
pkt, err := conn.Recv()
if err != nil {
return err
}
// XXX dup wrt RecvAndDecode
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
return fmt.Errorf("invalid msgCode (%d)", msgCode) // XXX err ctx
}
if msgType != reflect.TypeOf(resp) {
// Error response
if msgType == reflect.TypeOf(Error{}) {
errResp := Error{}
_, err = errResp.NEODecode(pkt.Payload())
if err != nil {
return err // XXX err ctx
}
return errDecode(&errResp) // XXX err ctx
}
return fmt.Errorf("unexpected reply: %T", msgType) // XXX err ctx
}
_, err = resp.NEODecode(pkt.Payload())
if err != nil {
return err // XXX err ctx
}
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment