Commit 3689c816 authored by Kirill Smelkov's avatar Kirill Smelkov

X splitting into packages in progress

parent 9888b7dc
...@@ -22,23 +22,24 @@ import ( ...@@ -22,23 +22,24 @@ import (
"context" "context"
"net/url" "net/url"
"../zodb" "../../neo"
"../../zodb"
) )
// Client talks to NEO cluster and exposes access it via ZODB interfaces // Client talks to NEO cluster and exposes access it via ZODB interfaces
type Client struct { type Client struct {
// XXX move -> nodeCommon? // XXX move -> nodeCommon?
// ---- 8< ---- // ---- 8< ----
myInfo NodeInfo // XXX -> only NodeUUID myInfo neo.NodeInfo // XXX -> only NodeUUID
clusterName string clusterName string
net Network // network we are sending/receiving on net neo.Network // network we are sending/receiving on
masterAddr string // address of master XXX -> Address ? masterAddr string // address of master XXX -> Address ?
// ---- 8< ---- // ---- 8< ----
storLink *NodeLink // link to storage node storLink *neo.NodeLink // link to storage node
storConn *Conn // XXX main connection to storage storConn *neo.Conn // XXX main connection to storage
} }
var _ zodb.IStorage = (*Client)(nil) var _ zodb.IStorage = (*Client)(nil)
...@@ -57,8 +58,8 @@ func (c *Client) Close() error { ...@@ -57,8 +58,8 @@ func (c *Client) Close() error {
func (c *Client) LastTid() (zodb.Tid, error) { func (c *Client) LastTid() (zodb.Tid, error) {
// FIXME do not use global conn (see comment in openClientByURL) // FIXME do not use global conn (see comment in openClientByURL)
// XXX open new conn for this particular req/reply ? // XXX open new conn for this particular req/reply ?
reply := AnswerLastTransaction{} reply := neo.AnswerLastTransaction{}
err := Ask(c.storConn, &LastTransaction{}, &reply) err := neo.Ask(c.storConn, &neo.LastTransaction{}, &reply)
if err != nil { if err != nil {
return 0, err // XXX err ctx return 0, err // XXX err ctx
} }
...@@ -67,17 +68,17 @@ func (c *Client) LastTid() (zodb.Tid, error) { ...@@ -67,17 +68,17 @@ func (c *Client) LastTid() (zodb.Tid, error) {
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// FIXME do not use global conn (see comment in openClientByURL) // FIXME do not use global conn (see comment in openClientByURL)
req := GetObject{Oid: xid.Oid} req := neo.GetObject{Oid: xid.Oid}
if xid.TidBefore { if xid.TidBefore {
req.Serial = INVALID_TID req.Serial = neo.INVALID_TID
req.Tid = xid.Tid req.Tid = xid.Tid
} else { } else {
req.Serial = xid.Tid req.Serial = xid.Tid
req.Tid = INVALID_TID req.Tid = neo.INVALID_TID
} }
resp := AnswerGetObject{} resp := neo.AnswerGetObject{}
err = Ask(c.storConn, &req, &resp) err = neo.Ask(c.storConn, &req, &resp)
if err != nil { if err != nil {
return nil, 0, err // XXX err context return nil, 0, err // XXX err context
} }
...@@ -97,7 +98,7 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -97,7 +98,7 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// NewClient creates and identifies new client connected to storage over storLink // NewClient creates and identifies new client connected to storage over storLink
func NewClient(storLink *NodeLink) (*Client, error) { func NewClient(storLink *neo.NodeLink) (*Client, error) {
// TODO .myInfo.NodeType = CLIENT // TODO .myInfo.NodeType = CLIENT
// .clusterName = clusterName // .clusterName = clusterName
// .net = ... // .net = ...
...@@ -106,7 +107,7 @@ func NewClient(storLink *NodeLink) (*Client, error) { ...@@ -106,7 +107,7 @@ func NewClient(storLink *NodeLink) (*Client, error) {
// XXX move -> Run? // XXX move -> Run?
// first identify ourselves to peer // first identify ourselves to peer
accept, err := IdentifyWith(STORAGE, storLink, cli.myInfo, cli.clusterName) accept, err := neo.IdentifyWith(neo.STORAGE, storLink, cli.myInfo, cli.clusterName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -133,8 +134,8 @@ func NewClient(storLink *NodeLink) (*Client, error) { ...@@ -133,8 +134,8 @@ func NewClient(storLink *NodeLink) (*Client, error) {
func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) { func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// XXX for now url is treated as storage node URL // XXX for now url is treated as storage node URL
// XXX check/use other url fields // XXX check/use other url fields
net := NetPlain("tcp") // TODO + TLS; not only "tcp" ? net := neo.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storLink, err := Dial(ctx, net, u.Host) storLink, err := neo.Dial(ctx, net, u.Host)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
package neo package main
// registry of all commands & help topics // registry of all commands & help topics
import "../zodb/zodbtools" import "../../../zodb/zodbtools"
var Commands = zodbtools.CommandRegistry{ var Commands = zodbtools.CommandRegistry{
{"master", masterSummary, masterUsage, masterMain}, {"master", masterSummary, masterUsage, masterMain},
......
...@@ -251,7 +251,7 @@ func (c *Conn) shutdown() { ...@@ -251,7 +251,7 @@ func (c *Conn) shutdown() {
} }
// Close closes connection // Close closes connection
// Any blocked Send() or Recv() will be unblocked and return error // Any blocked Send*() or Recv*() will be unblocked and return error
// //
// NOTE for Send() - once transmission was started - it will complete in the // NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break node-node link framing. // background on the wire not to break node-node link framing.
......
// TODO copyright / license // Copyright (C) 2016-2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// Package neo implements distributed object storage for ZODB // Package neo and its children provide distributed object storage for ZODB
// TODO text //
// Package neo itself provides protocol definition and common infrastructure.
// See packages neo.client and neo.server for client and server sides respectively.
// XXX text
package neo package neo
import ( import (
......
// NEO. Protocol description
//go:generate sh -c "go run protogen.go >proto-marshal.go" //go:generate sh -c "go run protogen.go >proto-marshal.go"
package neo package neo
// protocol definition
// NOTE for some packets it is possible to decode raw packet -> go version from // NOTE for some packets it is possible to decode raw packet -> go version from
// PktBuf in place. E.g. for GetObject. // PktBuf in place. E.g. for GetObject.
...@@ -139,9 +139,9 @@ type NodeUUID int32 ...@@ -139,9 +139,9 @@ type NodeUUID int32
// ErrDecodeOverflow is the error returned by NEOPktDecode when decoding hit buffer overflow // ErrDecodeOverflow is the error returned by NEOPktDecode when decoding hit buffer overflow
var ErrDecodeOverflow = errors.New("decode: bufer overflow") var ErrDecodeOverflow = errors.New("decode: bufer overflow")
// NEOPkt is the interface implemented by packets to marshal/unmarshal them into/from wire format // Pkt is the interface implemented by NEO packets to marshal/unmarshal them into/from wire format
// XXX -> will be neo.Pkt after splitting into packages // XXX -> will be neo.Pkt after splitting into packages
type NEOPkt interface { type Pkt interface {
// NEOPktMsgCode returns message code needed to be used for particular packet type // NEOPktMsgCode returns message code needed to be used for particular packet type
// on the wire // on the wire
NEOPktMsgCode() uint16 NEOPktMsgCode() uint16
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
NEO. Protocol module. Code generator NEO. Protocol module. Code generator
This program generates marshalling code for packet types defined in proto.go . This program generates marshalling code for packet types defined in proto.go .
For every type 4 methods are generated in accordance with NEOPkt interface: For every type 4 methods are generated in accordance with neo.Pkt interface:
NEOPktMsgCode() uint16 NEOPktMsgCode() uint16
NEOPktEncodedLen() int NEOPktEncodedLen() int
...@@ -469,7 +469,7 @@ type sizer struct { ...@@ -469,7 +469,7 @@ type sizer struct {
// encoder generates code to encode a packet // encoder generates code to encode a packet
// //
// when type is recursively walked, for every case code to update `data[n:]` is generated. // when type is recursively walked, for every case code to update `data[n:]` is generated.
// no overflow checks are generated as by NEOPkt interface provided data // no overflow checks are generated as by neo.Pkt interface provided data
// buffer should have at least payloadLen length returned by NEOPktEncodedInfo() // buffer should have at least payloadLen length returned by NEOPktEncodedInfo()
// (the size computed by sizer). // (the size computed by sizer).
// //
......
...@@ -30,7 +30,8 @@ import ( ...@@ -30,7 +30,8 @@ import (
"os" "os"
"sync" "sync"
"../zodb" "../../neo"
"../../zodb"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
...@@ -38,7 +39,7 @@ import ( ...@@ -38,7 +39,7 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works // Master is a node overseeing and managing how whole NEO cluster works
type Master struct { type Master struct {
clusterName string clusterName string
nodeUUID NodeUUID nodeUUID neo.NodeUUID
// last allocated oid & tid // last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ? // XXX how to start allocating oid from 0, not 1 ?
...@@ -48,9 +49,9 @@ type Master struct { ...@@ -48,9 +49,9 @@ type Master struct {
// master manages node and partition tables and broadcast their updates // master manages node and partition tables and broadcast their updates
// to all nodes in cluster // to all nodes in cluster
stateMu sync.RWMutex // XXX recheck: needed ? stateMu sync.RWMutex // XXX recheck: needed ?
nodeTab NodeTable nodeTab neo.NodeTable
partTab PartitionTable partTab neo.PartitionTable
clusterState ClusterState clusterState neo.ClusterState
// channels controlling main driver // channels controlling main driver
ctlStart chan chan error // request to start cluster ctlStart chan chan error // request to start cluster
...@@ -65,22 +66,22 @@ type Master struct { ...@@ -65,22 +66,22 @@ type Master struct {
// node connects // node connects
type nodeCome struct { type nodeCome struct {
link *NodeLink link *neo.NodeLink
idReq RequestIdentification // we received this identification request idReq neo.RequestIdentification // we received this identification request
idResp chan NEOPkt // what we reply (AcceptIdentification | Error) idResp chan neo.Pkt // what we reply (AcceptIdentification | Error)
} }
// node disconnects // node disconnects
type nodeLeave struct { type nodeLeave struct {
link *NodeLink // XXX better use uuid allocated on nodeCome ? link *neo.NodeLink // XXX better use uuid allocated on nodeCome ?
} }
// NewMaster TODO ... // NewMaster TODO ...
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
m := &Master{clusterName: clusterName} m := &Master{clusterName: clusterName}
m.nodeUUID = m.allocUUID(MASTER) m.nodeUUID = m.allocUUID(neo.MASTER)
// TODO update nodeTab with self // TODO update nodeTab with self
m.clusterState = ClusterRecovering // XXX no elections - we are the only master m.clusterState = neo.ClusterRecovering // XXX no elections - we are the only master
go m.run(context.TODO()) // XXX ctx go m.run(context.TODO()) // XXX ctx
return m return m
...@@ -115,7 +116,7 @@ func (m *Master) Shutdown() error { ...@@ -115,7 +116,7 @@ func (m *Master) Shutdown() error {
// setClusterState sets .clusterState and notifies subscribers // setClusterState sets .clusterState and notifies subscribers
func (m *Master) setClusterState(state ClusterState) { func (m *Master) setClusterState(state neo.ClusterState) {
if state == m.clusterState { // <- XXX do we really need this ? if state == m.clusterState { // <- XXX do we really need this ?
return return
} }
...@@ -179,7 +180,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -179,7 +180,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
fmt.Println("master: recovery") fmt.Println("master: recovery")
defer xerr.Context(&err, "master: recovery") defer xerr.Context(&err, "master: recovery")
m.setClusterState(ClusterRecovering) m.setClusterState(neo.ClusterRecovering)
rctx, rcancel := context.WithCancel(ctx) rctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
...@@ -188,7 +189,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -188,7 +189,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// start recovery on all storages we are currently in touch with // start recovery on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
go storCtlRecovery(rctx, stor.Link, recovery) go storCtlRecovery(rctx, stor.Link, recovery)
} }
...@@ -274,7 +275,7 @@ loop: ...@@ -274,7 +275,7 @@ loop:
// storRecovery is result of a storage node passing recovery phase // storRecovery is result of a storage node passing recovery phase
type storRecovery struct { type storRecovery struct {
partTab PartitionTable partTab neo.PartitionTable
// XXX + backup_tid, truncate_tid ? // XXX + backup_tid, truncate_tid ?
err error err error
...@@ -282,7 +283,7 @@ type storRecovery struct { ...@@ -282,7 +283,7 @@ type storRecovery struct {
// storCtlRecovery drives a storage node during cluster recovering state // storCtlRecovery drives a storage node during cluster recovering state
// it retrieves various ids and partition table from as stored on the storage // it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) { func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecovery) {
var err error var err error
// XXX where this close link on error should be ? // XXX where this close link on error should be ?
defer func() { defer func() {
...@@ -309,30 +310,30 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) ...@@ -309,30 +310,30 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
} }
// XXX cancel on ctx // XXX cancel on ctx
recovery := AnswerRecovery{} recovery := neo.AnswerRecovery{}
err = Ask(conn, &Recovery{}, &recovery) err = neo.Ask(conn, &neo.Recovery{}, &recovery)
if err != nil { if err != nil {
return return
} }
resp := AnswerPartitionTable{} resp := neo.AnswerPartitionTable{}
err = Ask(conn, &X_PartitionTable{}, &resp) err = neo.Ask(conn, &neo.X_PartitionTable{}, &resp)
if err != nil { if err != nil {
return return
} }
// reconstruct partition table from response // reconstruct partition table from response
pt := PartitionTable{} pt := neo.PartitionTable{}
pt.ptid = resp.PTid pt.ptid = resp.PTid
for _, row := range resp.RowList { for _, row := range resp.RowList {
i := row.Offset i := row.Offset
for i >= uint32(len(pt.ptTab)) { for i >= uint32(len(pt.ptTab)) {
pt.ptTab = append(pt.ptTab, []PartitionCell{}) pt.ptTab = append(pt.ptTab, []neo.PartitionCell{})
} }
//pt.ptTab[i] = append(pt.ptTab[i], row.CellList...) //pt.ptTab[i] = append(pt.ptTab[i], row.CellList...)
for _, cell := range row.CellList { for _, cell := range row.CellList {
pt.ptTab[i] = append(pt.ptTab[i], PartitionCell{ pt.ptTab[i] = append(pt.ptTab[i], neo.PartitionCell{
NodeUUID: cell.NodeUUID, NodeUUID: cell.NodeUUID,
CellState: cell.CellState, CellState: cell.CellState,
}) })
...@@ -366,7 +367,7 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -366,7 +367,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
fmt.Println("master: verify") fmt.Println("master: verify")
defer xerr.Context(&err, "master: verify") defer xerr.Context(&err, "master: verify")
m.setClusterState(ClusterVerifying) m.setClusterState(neo.ClusterVerifying)
vctx, vcancel := context.WithCancel(ctx) vctx, vcancel := context.WithCancel(ctx)
defer vcancel() defer vcancel()
...@@ -381,7 +382,7 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -381,7 +382,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
// start verification on all storages we are currently in touch with // start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
go storCtlVerify(vctx, stor.Link, verify) go storCtlVerify(vctx, stor.Link, verify)
} }
...@@ -468,12 +469,12 @@ loop: ...@@ -468,12 +469,12 @@ loop:
type storVerify struct { type storVerify struct {
lastOid zodb.Oid lastOid zodb.Oid
lastTid zodb.Tid lastTid zodb.Tid
link *NodeLink link *neo.NodeLink
err error err error
} }
// storCtlVerify drives a storage node during cluster verifying (= starting) state // storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify) {
// XXX link.Close on err // XXX link.Close on err
// XXX cancel on ctx // XXX cancel on ctx
...@@ -488,8 +489,8 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { ...@@ -488,8 +489,8 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// FIXME stub // FIXME stub
conn, _ := link.NewConn() conn, _ := link.NewConn()
locked := AnswerLockedTransactions{} locked := neo.AnswerLockedTransactions{}
err = Ask(conn, &LockedTransactions{}, &locked) err = neo.Ask(conn, &neo.LockedTransactions{}, &locked)
if err != nil { if err != nil {
return return
} }
...@@ -500,8 +501,8 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { ...@@ -500,8 +501,8 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
return return
} }
last := AnswerLastIDs{} last := neo.AnswerLastIDs{}
err = Ask(conn, &LastIDs{}, &last) err = neo.Ask(conn, &neo.LastIDs{}, &last)
if err != nil { if err != nil {
return return
} }
...@@ -573,7 +574,7 @@ loop: ...@@ -573,7 +574,7 @@ loop:
// accept processes identification request of just connected node and either accepts or declines it // accept processes identification request of just connected node and either accepts or declines it
// if node identification is accepted nodeTab is updated and corresponding node entry is returned // if node identification is accepted nodeTab is updated and corresponding node entry is returned
func (m *Master) accept(n nodeCome) (node *Node, ok bool) { func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
// - IdTimestamp ? // - IdTimestamp ?
...@@ -609,7 +610,7 @@ func (m *Master) accept(n nodeCome) (node *Node, ok bool) { ...@@ -609,7 +610,7 @@ func (m *Master) accept(n nodeCome) (node *Node, ok bool) {
n.idResp <- &AcceptIdentification{ n.idResp <- &AcceptIdentification{
NodeType: MASTER, NodeType: neo.MASTER,
MyNodeUUID: m.nodeUUID, MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded NumReplicas: 1, // FIXME hardcoded
...@@ -642,7 +643,7 @@ func (m *Master) accept(n nodeCome) (node *Node, ok bool) { ...@@ -642,7 +643,7 @@ func (m *Master) accept(n nodeCome) (node *Node, ok bool) {
// allocUUID allocates new node uuid for a node of kind nodeType // allocUUID allocates new node uuid for a node of kind nodeType
// XXX it is bad idea for master to assign uuid to coming node // XXX it is bad idea for master to assign uuid to coming node
// -> better nodes generate really unique UUID themselves and always show with them // -> better nodes generate really unique UUID themselves and always show with them
func (m *Master) allocUUID(nodeType NodeType) NodeUUID { func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID {
// see NodeUUID & NodeUUID.String for details // see NodeUUID & NodeUUID.String for details
// XXX better to keep this code near to ^^^ (e.g. attached to NodeType) // XXX better to keep this code near to ^^^ (e.g. attached to NodeType)
// XXX but since whole uuid assign idea is not good - let's keep it dirty here // XXX but since whole uuid assign idea is not good - let's keep it dirty here
...@@ -659,7 +660,7 @@ func (m *Master) allocUUID(nodeType NodeType) NodeUUID { ...@@ -659,7 +660,7 @@ func (m *Master) allocUUID(nodeType NodeType) NodeUUID {
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
logf := func(format string, argv ...interface{}) { logf := func(format string, argv ...interface{}) {
fmt.Printf("master: %s: " + format + "\n", append([]interface{}{link}, argv...)) fmt.Printf("master: %s: " + format + "\n", append([]interface{}{link}, argv...))
} }
...@@ -704,7 +705,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -704,7 +705,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
} }
// convey identification request to master // convey identification request to master
idRespCh := make(chan NEOPkt) idRespCh := make(chan neo.Pkt)
m.nodeCome <- nodeCome{link, idReq, idRespCh} m.nodeCome <- nodeCome{link, idReq, idRespCh}
idResp := <-idRespCh idResp := <-idRespCh
...@@ -760,7 +761,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -760,7 +761,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
m.stateMu.Unlock() m.stateMu.Unlock()
go func() { go func() {
var pkt NEOPkt var pkt neo.Pkt
for { for {
select { select {
...@@ -798,8 +799,8 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -798,8 +799,8 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
// ServeClient serves incoming connection on which peer identified itself as client // ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return? // XXX +error return?
//func (m *Master) ServeClient(ctx context.Context, conn *Conn) { //func (m *Master) ServeClient(ctx context.Context, conn *neo.Conn) {
func (m *Master) ServeClient(ctx context.Context, link *NodeLink) { func (m *Master) ServeClient(ctx context.Context, link *neo.NodeLink) {
// TODO // TODO
} }
...@@ -839,7 +840,7 @@ type storageStopOperation struct { ...@@ -839,7 +840,7 @@ type storageStopOperation struct {
// with e.g. a command or request and expects corresponding answer // with e.g. a command or request and expects corresponding answer
// //
// XXX +error return? // XXX +error return?
func (m *Master) DriveStorage(ctx context.Context, link *NodeLink) { func (m *Master) DriveStorage(ctx context.Context, link *neo.NodeLink) {
// ? >UnfinishedTransactions // ? >UnfinishedTransactions
// ? <AnswerUnfinishedTransactions (none currently) // ? <AnswerUnfinishedTransactions (none currently)
...@@ -952,11 +953,11 @@ func (m *Master) DriveStorage(ctx context.Context, link *NodeLink) { ...@@ -952,11 +953,11 @@ func (m *Master) DriveStorage(ctx context.Context, link *NodeLink) {
// StopOperation PM -> S // StopOperation PM -> S
} }
func (m *Master) ServeAdmin(ctx context.Context, conn *Conn) { func (m *Master) ServeAdmin(ctx context.Context, conn *neo.Conn) {
// TODO // TODO
} }
func (m *Master) ServeMaster(ctx context.Context, conn *Conn) { func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) {
// TODO (for elections) // TODO (for elections)
} }
......
...@@ -24,6 +24,8 @@ import ( ...@@ -24,6 +24,8 @@ import (
"net" "net"
"reflect" "reflect"
"../../neo"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
...@@ -31,7 +33,7 @@ import ( ...@@ -31,7 +33,7 @@ import (
type Server interface { type Server interface {
// ServeLink serves already established nodelink (connection) in a blocking way. // ServeLink serves already established nodelink (connection) in a blocking way.
// ServeLink is usually run in separate goroutine // ServeLink is usually run in separate goroutine
ServeLink(ctx context.Context, link *NodeLink) ServeLink(ctx context.Context, link *neo.NodeLink)
} }
// Serve runs service on a listener // Serve runs service on a listener
...@@ -79,7 +81,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error { ...@@ -79,7 +81,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error {
// ListenAndServe listens on network address and then calls Serve to handle incoming connections // ListenAndServe listens on network address and then calls Serve to handle incoming connections
// XXX unused -> goes away ? // XXX unused -> goes away ?
func ListenAndServe(ctx context.Context, net Network, laddr string, srv Server) error { func ListenAndServe(ctx context.Context, net neo.Network, laddr string, srv Server) error {
l, err := net.Listen(laddr) l, err := net.Listen(laddr)
if err != nil { if err != nil {
return err return err
...@@ -95,7 +97,7 @@ func ListenAndServe(ctx context.Context, net Network, laddr string, srv Server) ...@@ -95,7 +97,7 @@ func ListenAndServe(ctx context.Context, net Network, laddr string, srv Server)
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes. // it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error. // returns information about identified node or error.
// XXX recheck identification logic here // XXX recheck identification logic here
func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) { func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.RequestIdentification, err error) {
defer xerr.Contextf(&err, "%s: identify", link) defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
...@@ -139,7 +141,7 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi ...@@ -139,7 +141,7 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
// IdentifyWith identifies local node with remote peer // IdentifyWith identifies local node with remote peer
// it also verifies peer's node type to what caller expects // it also verifies peer's node type to what caller expects
// XXX place != ok (this is client, not server ?) // XXX place != ok (this is client, not server ?)
func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clusterName string) (accept *AcceptIdentification, err error) { func IdentifyWith(expectPeerType neo.NodeType, link *neo.NodeLink, myInfo neo.NodeInfo, clusterName string) (accept *neo.AcceptIdentification, err error) {
defer xerr.Contextf(&err, "%s: request identification", link) defer xerr.Contextf(&err, "%s: request identification", link)
conn, err := link.NewConn() conn, err := link.NewConn()
...@@ -153,7 +155,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus ...@@ -153,7 +155,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus
} }
}() }()
accept = &AcceptIdentification{} accept = &neo.AcceptIdentification{}
err = Ask(conn, &RequestIdentification{ err = Ask(conn, &RequestIdentification{
NodeType: myInfo.NodeType, NodeType: myInfo.NodeType,
NodeUUID: myInfo.NodeUUID, NodeUUID: myInfo.NodeUUID,
...@@ -178,7 +180,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus ...@@ -178,7 +180,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus
// XXX naming for RecvAndDecode and EncodeAndSend // XXX naming for RecvAndDecode and EncodeAndSend
// RecvAndDecode receives packet from conn and decodes it // RecvAndDecode receives packet from conn and decodes it
func RecvAndDecode(conn *Conn) (NEOPkt, error) { func RecvAndDecode(conn *neo.Conn) (neo.Pkt, error) {
pkt, err := conn.Recv() pkt, err := conn.Recv()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -195,7 +197,7 @@ func RecvAndDecode(conn *Conn) (NEOPkt, error) { ...@@ -195,7 +197,7 @@ func RecvAndDecode(conn *Conn) (NEOPkt, error) {
} }
// TODO use free-list for decoded packets + when possible decode in-place // TODO use free-list for decoded packets + when possible decode in-place
pktObj := reflect.New(msgType).Interface().(NEOPkt) pktObj := reflect.New(msgType).Interface().(neo.Pkt)
_, err = pktObj.NEOPktDecode(pkt.Payload()) _, err = pktObj.NEOPktDecode(pkt.Payload())
if err != nil { if err != nil {
// XXX -> ProtoError ? // XXX -> ProtoError ?
...@@ -206,7 +208,7 @@ func RecvAndDecode(conn *Conn) (NEOPkt, error) { ...@@ -206,7 +208,7 @@ func RecvAndDecode(conn *Conn) (NEOPkt, error) {
} }
// EncodeAndSend encodes pkt and sends it to conn // EncodeAndSend encodes pkt and sends it to conn
func EncodeAndSend(conn *Conn, pkt NEOPkt) error { func EncodeAndSend(conn *neo.Conn, pkt neo.Pkt) error {
l := pkt.NEOPktEncodedLen() l := pkt.NEOPktEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
...@@ -222,7 +224,7 @@ func EncodeAndSend(conn *Conn, pkt NEOPkt) error { ...@@ -222,7 +224,7 @@ func EncodeAndSend(conn *Conn, pkt NEOPkt) error {
// Ask does simple request/response protocol exchange // Ask does simple request/response protocol exchange
// It expects the answer to be exactly of resp type and errors otherwise // It expects the answer to be exactly of resp type and errors otherwise
func Ask(conn *Conn, req NEOPkt, resp NEOPkt) error { func Ask(conn *neo.Conn, req neo.Pkt, resp neo.Pkt) error {
err := EncodeAndSend(conn, req) err := EncodeAndSend(conn, req)
if err != nil { if err != nil {
return err return err
...@@ -237,7 +239,7 @@ func Ask(conn *Conn, req NEOPkt, resp NEOPkt) error { ...@@ -237,7 +239,7 @@ func Ask(conn *Conn, req NEOPkt, resp NEOPkt) error {
// unexpected packet or packet with wrong header // unexpected packet or packet with wrong header
// XXX -> ConnError{Op: "decode"} ? // XXX -> ConnError{Op: "decode"} ?
type ProtoError struct { type ProtoError struct {
Conn *Conn Conn *neo.Conn
Err error Err error
} }
...@@ -247,7 +249,7 @@ func (e *ProtoError) Error() string { ...@@ -247,7 +249,7 @@ func (e *ProtoError) Error() string {
// Expect receives 1 packet and expects it to be exactly of msg type // Expect receives 1 packet and expects it to be exactly of msg type
// XXX naming (-> Recv1 ?) // XXX naming (-> Recv1 ?)
func Expect(conn *Conn, msg NEOPkt) (err error) { func Expect(conn *neo.Conn, msg neo.Pkt) (err error) {
pkt, err := conn.Recv() pkt, err := conn.Recv()
if err != nil { if err != nil {
return err return err
......
...@@ -28,8 +28,9 @@ import ( ...@@ -28,8 +28,9 @@ import (
"strings" "strings"
"time" "time"
"../zodb" "../../neo"
"../zodb/storage/fs1" "../../zodb"
"../../zodb/storage/fs1"
) )
// XXX fmt -> log // XXX fmt -> log
...@@ -38,10 +39,10 @@ import ( ...@@ -38,10 +39,10 @@ import (
type Storage struct { type Storage struct {
// XXX move -> nodeCommon? // XXX move -> nodeCommon?
// ---- 8< ---- // ---- 8< ----
myInfo NodeInfo // XXX -> only Address + NodeUUID ? myInfo neo.NodeInfo // XXX -> only Address + NodeUUID ?
clusterName string clusterName string
net Network // network we are sending/receiving on net neo.Network // network we are sending/receiving on
masterAddr string // address of master XXX -> Address ? masterAddr string // address of master XXX -> Address ?
// ---- 8< ---- // ---- 8< ----
...@@ -51,7 +52,7 @@ type Storage struct { ...@@ -51,7 +52,7 @@ type Storage struct {
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr // NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr
// The storage uses zstor as underlying backend for storing data. // The storage uses zstor as underlying backend for storing data.
// To actually start running the node - call Run. XXX text // To actually start running the node - call Run. XXX text
func NewStorage(cluster string, masterAddr string, serveAddr string, net Network, zstor zodb.IStorage) *Storage { func NewStorage(cluster string, masterAddr string, serveAddr string, net neo.Network, zstor zodb.IStorage) *Storage {
// convert serveAddr into neo format // convert serveAddr into neo format
addr, err := ParseAddress(serveAddr) addr, err := ParseAddress(serveAddr)
if err != nil { if err != nil {
...@@ -176,7 +177,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -176,7 +177,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
fmt.Printf("stor: %s: serving new node\n", link) fmt.Printf("stor: %s: serving new node\n", link)
// close link when either cancelling or returning (e.g. due to an error) // close link when either cancelling or returning (e.g. due to an error)
...@@ -203,7 +204,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -203,7 +204,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
return return
} }
var serveConn func(context.Context, *Conn) var serveConn func(context.Context, *neo.Conn)
switch nodeInfo.NodeType { switch nodeInfo.NodeType {
case CLIENT: case CLIENT:
serveConn = stor.ServeClient serveConn = stor.ServeClient
...@@ -233,22 +234,22 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -233,22 +234,22 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
// XXX move err{Encode,Decode} out of here // XXX move err{Encode,Decode} out of here
// errEncode translates an error into Error packet // errEncode translates an error into Error packet
func errEncode(err error) *Error { func errEncode(err error) *neo.Error {
switch err := err.(type) { switch err := err.(type) {
case *Error: case *Error:
return err return err
case *zodb.ErrXidMissing: case *zodb.ErrXidMissing:
// XXX abusing message for xid // XXX abusing message for xid
return &Error{Code: OID_NOT_FOUND, Message: err.Xid.String()} return &neo.Error{Code: OID_NOT_FOUND, Message: err.Xid.String()}
default: default:
return &Error{Code: NOT_READY /* XXX how to report 503? was BROKEN_NODE */, Message: err.Error()} return &neo.Error{Code: NOT_READY /* XXX how to report 503? was BROKEN_NODE */, Message: err.Error()}
} }
} }
// errDecode decodes error from Error packet // errDecode decodes error from Error packet
func errDecode(e *Error) error { func errDecode(e *neo.Error) error {
switch e.Code { switch e.Code {
case OID_NOT_FOUND: case OID_NOT_FOUND:
xid, err := zodb.ParseXid(e.Message) // XXX abusing message for xid xid, err := zodb.ParseXid(e.Message) // XXX abusing message for xid
...@@ -260,7 +261,7 @@ func errDecode(e *Error) error { ...@@ -260,7 +261,7 @@ func errDecode(e *Error) error {
return e return e
} }
func (stor *Storage) ServeMaster(ctx context.Context, conn *Conn) { func (stor *Storage) ServeMaster(ctx context.Context, conn *neo.Conn) {
// state changes: // state changes:
// //
...@@ -275,7 +276,7 @@ func (stor *Storage) ServeMaster(ctx context.Context, conn *Conn) { ...@@ -275,7 +276,7 @@ func (stor *Storage) ServeMaster(ctx context.Context, conn *Conn) {
// ServeClient serves incoming connection on which peer identified itself as client // ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return? // XXX +error return?
func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn) fmt.Printf("stor: %s: serving new client conn\n", conn)
// close connection when either cancelling or returning (e.g. due to an error) // close connection when either cancelling or returning (e.g. due to an error)
...@@ -312,7 +313,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -312,7 +313,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
xid.TidBefore = true xid.TidBefore = true
} }
var reply NEOPkt var reply neo.Pkt
data, tid, err := stor.zstor.Load(xid) data, tid, err := stor.zstor.Load(xid)
if err != nil { if err != nil {
// TODO translate err to NEO protocol error codes // TODO translate err to NEO protocol error codes
...@@ -334,7 +335,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -334,7 +335,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
EncodeAndSend(conn, reply) // XXX err EncodeAndSend(conn, reply) // XXX err
case *LastTransaction: case *LastTransaction:
var reply NEOPkt var reply neo.Pkt
lastTid, err := stor.zstor.LastTid() lastTid, err := stor.zstor.LastTid()
if err != nil { if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment