Commit 172e8831 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 71a10f5b
...@@ -768,8 +768,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -768,8 +768,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// first read to read pkt header and hopefully up to page of data in 1 syscall // first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)} pkt := &PktBuf{make([]byte, 4096)}
// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ? // TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen) //n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, pktHeaderLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen]) n, err := io.ReadFull(nl.peerLink, pkt.Data[:pktHeaderLen])
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -777,8 +777,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -777,8 +777,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
pkth := pkt.Header() pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ? // XXX -> better PktHeader.Decode() ?
pktLen := PktHeadLen + ntoh32(pkth.MsgLen) // .MsgLen is payload-only length without header pktLen := pktHeaderLen + ntoh32(pkth.MsgLen) // .MsgLen is payload-only length without header
if pktLen > MAX_PACKET_SIZE { if pktLen > pktMaxSize {
return nil, ErrPktTooBig return nil, ErrPktTooBig
} }
...@@ -815,7 +815,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -815,7 +815,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// On success raw connection is returned wrapped into NodeLink. // On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed. // On error raw connection is closed.
func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) { func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) {
err = handshake(ctx, conn, PROTOCOL_VERSION) err = handshake(ctx, conn, ProtocolVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -1160,7 +1160,7 @@ func (c *Conn) Send(msg Msg) error { ...@@ -1160,7 +1160,7 @@ func (c *Conn) Send(msg Msg) error {
traceConnSendPre(c, msg) traceConnSendPre(c, msg)
l := msg.neoMsgEncodedLen() l := msg.neoMsgEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen+l)} // TODO -> freelist buf := PktBuf{make([]byte, pktHeaderLen+l)} // TODO -> freelist
h := buf.Header() h := buf.Header()
// h.ConnId will be set by conn.Send // h.ConnId will be set by conn.Send
......
...@@ -94,7 +94,7 @@ func xconnError(err error) error { ...@@ -94,7 +94,7 @@ func xconnError(err error) error {
// Prepare PktBuf with content // Prepare PktBuf with content
func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))} pkt := &PktBuf{make([]byte, pktHeaderLen + len(payload))}
h := pkt.Header() h := pkt.Header()
h.ConnId = hton32(connid) h.ConnId = hton32(connid)
h.MsgCode = hton16(msgcode) h.MsgCode = hton16(msgcode)
......
...@@ -35,30 +35,22 @@ type PktBuf struct { ...@@ -35,30 +35,22 @@ type PktBuf struct {
Data []byte // whole packet data including all headers XXX -> Buf ? Data []byte // whole packet data including all headers XXX -> Buf ?
} }
// PktHead represents header of a raw packet
// XXX naming -> PktHeader ?
type PktHead struct {
ConnId be32 // NOTE is .msgid in py
MsgCode be16 // payload message code
MsgLen be32 // payload message length (excluding packet header)
}
// Header returns pointer to packet header // Header returns pointer to packet header
func (pkt *PktBuf) Header() *PktHead { func (pkt *PktBuf) Header() *PktHeader {
// XXX check len(Data) < PktHead ? -> no, Data has to be allocated with cap >= PktHeadLen // XXX check len(Data) < PktHeader ? -> no, Data has to be allocated with cap >= pktHeaderLen
return (*PktHead)(unsafe.Pointer(&pkt.Data[0])) return (*PktHeader)(unsafe.Pointer(&pkt.Data[0]))
} }
// Payload returns []byte representing packet payload // Payload returns []byte representing packet payload
func (pkt *PktBuf) Payload() []byte { func (pkt *PktBuf) Payload() []byte {
return pkt.Data[PktHeadLen:] return pkt.Data[pktHeaderLen:]
} }
// Strings dumps a packet in human-readable form // Strings dumps a packet in human-readable form
func (pkt *PktBuf) String() string { func (pkt *PktBuf) String() string {
if len(pkt.Data) < PktHeadLen { if len(pkt.Data) < pktHeaderLen {
return fmt.Sprintf("(! < PktHeadLen) % x", pkt.Data) return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data)
} }
h := pkt.Header() h := pkt.Header()
...@@ -92,8 +84,8 @@ func (pkt *PktBuf) String() string { ...@@ -92,8 +84,8 @@ func (pkt *PktBuf) String() string {
// Dump dumps a packet in raw form // Dump dumps a packet in raw form
func (pkt *PktBuf) Dump() string { func (pkt *PktBuf) Dump() string {
if len(pkt.Data) < PktHeadLen { if len(pkt.Data) < pktHeaderLen {
return fmt.Sprintf("(! < PktHeadLen) % x", pkt.Data) return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data)
} }
h := pkt.Header() h := pkt.Header()
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
//go:generate sh -c "go run protogen.go >zproto-marshal.go"
package neo package neo
// protocol definition // protocol definition
...@@ -25,18 +23,23 @@ package neo ...@@ -25,18 +23,23 @@ package neo
// In particular every type that is included in a message is defined here as well. // In particular every type that is included in a message is defined here as well.
// //
// By default a structure defined in this file becomes a separate network message. // By default a structure defined in this file becomes a separate network message.
// If a structure is defined only to represent basic type that is included in // If a structure is defined only to represent basic type that is e.g. included in
// several messages and does not itself denote a separate message, its // several messages and does not itself denote a separate message, its
// definition is prefixed with `//neo:proto typeonly` comment. // definition is prefixed with `//neo:proto typeonly` comment.
// //
// The order of message definitions is significant - messages are assigned // The order of message definitions is significant - messages are assigned
// message IDs in the same order they are defined. // message codes in the same order they are defined.
// //
// For compatibility with neo/py a message has its ID assigned with "answer" // For compatibility with neo/py a message has its code assigned with "answer"
// bit set if either message name starts with "Answer" or message definition is // bit set if either message name starts with "Answer" or message definition is
// prefixed with `//neo:proto answer` comment. // prefixed with `//neo:proto answer` comment.
//
// Packet structure and messages are bit-to-bit compatible with neo/py (see protocol.py).
//
// The code to marshal/unmarshal messages is generated by protogen.go .
//go:generate sh -c "go run protogen.go >zproto-marshal.go"
// XXX bit-to-bit comparible with protocol.py
// TODO regroup messages definitions to stay more close to 1 communication topic // TODO regroup messages definitions to stay more close to 1 communication topic
// TODO document protocol itself better (who sends who what with which semantic) // TODO document protocol itself better (who sends who what with which semantic)
...@@ -46,7 +49,6 @@ package neo ...@@ -46,7 +49,6 @@ package neo
// //
// TODO work this out // TODO work this out
// XXX move imports out of here
import ( import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -59,17 +61,28 @@ const ( ...@@ -59,17 +61,28 @@ const (
// The protocol version must be increased whenever upgrading a node may require // The protocol version must be increased whenever upgrading a node may require
// to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and // to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
// the high order byte 0 is different from TLS Handshake (0x16). // the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 1 ProtocolVersion = 1
// XXX ENCODED_VERSION ?
PktHeadLen = 10 // XXX unsafe.Sizeof(PktHead{}) give _typed_ constant (uintptr) // length of packet header
// TODO link this to PktHead.Encode/Decode size ? XXX -> pkt.go ? pktHeaderLen = 10 // = unsafe.Sizeof(PktHeader{}), but latter gives typed constant (uintptr)
MAX_PACKET_SIZE = 0x4000000 // we are not accepting packets larger than pktMaxSize.
// in particular this avoids out-of-memory error on packets with corrupt message len.
pktMaxSize = 0x4000000
answerBit = 0x8000 // answerBit is set in message code in answer messages for compatibility with neo/py
answerBit = 0x8000
) )
// PktHeader represents header of a raw packet.
//
//neo:proto typeonly
type PktHeader struct {
ConnId be32 // NOTE is .msgid in py
MsgCode be16 // payload message code
MsgLen be32 // payload message length (excluding packet header)
}
type ErrorCode uint32 type ErrorCode uint32
const ( const (
ACK ErrorCode = iota ACK ErrorCode = iota
...@@ -102,7 +115,7 @@ const ( ...@@ -102,7 +115,7 @@ const (
// - replay the transaction log, in case of unclean shutdown; // - replay the transaction log, in case of unclean shutdown;
// - and actually truncate the DB if the user asked to do so. // - and actually truncate the DB if the user asked to do so.
// Then, the cluster either goes to ClusterRunning or STARTING_BACKUP state. // Then, the cluster either goes to ClusterRunning or STARTING_BACKUP state.
ClusterVerifying // XXX = ClusterStarting ClusterVerifying
// Normal operation. The DB is read-writable by clients. // Normal operation. The DB is read-writable by clients.
ClusterRunning ClusterRunning
// Transient state to shutdown the whole cluster. // Transient state to shutdown the whole cluster.
...@@ -177,7 +190,7 @@ type NodeUUID int32 ...@@ -177,7 +190,7 @@ type NodeUUID int32
// TODO NodeType -> base NodeUUID // TODO NodeType -> base NodeUUID
// ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hit buffer overflow // ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hits buffer overflow
var ErrDecodeOverflow = errors.New("decode: bufer overflow") var ErrDecodeOverflow = errors.New("decode: bufer overflow")
// Msg is the interface implemented by all NEO messages. // Msg is the interface implemented by all NEO messages.
...@@ -217,7 +230,7 @@ func (a *Address) neoEncodedLen() int { ...@@ -217,7 +230,7 @@ func (a *Address) neoEncodedLen() int {
func (a *Address) neoEncode(b []byte) int { func (a *Address) neoEncode(b []byte) int {
n := string_neoEncode(a.Host, b[0:]) n := string_neoEncode(a.Host, b[0:])
if a.Host != "" { if a.Host != "" {
BigEndian.PutUint16(b[n:], a.Port) binary.BigEndian.PutUint16(b[n:], a.Port)
n += 2 n += 2
} }
return n return n
...@@ -226,7 +239,7 @@ func (a *Address) neoEncode(b []byte) int { ...@@ -226,7 +239,7 @@ func (a *Address) neoEncode(b []byte) int {
func (a *Address) neoDecode(b []byte) int { func (a *Address) neoDecode(b []byte) int {
n := string_neoDecode(&a.Host, b) n := string_neoDecode(&a.Host, b)
if a.Host != "" { if a.Host != "" {
a.Port = BigEndian.Uint16(b[n:]) a.Port = binary.BigEndian.Uint16(b[n:])
n += 2 n += 2
} else { } else {
a.Port = 0 a.Port = 0
...@@ -702,7 +715,7 @@ type SetNodeState struct { ...@@ -702,7 +715,7 @@ type SetNodeState struct {
NodeUUID NodeUUID
NodeState NodeState
// XXX _answer = Error ? // XXX _answer = Error
} }
// Ask the primary to include some pending node in the partition table // Ask the primary to include some pending node in the partition table
...@@ -719,13 +732,6 @@ type TweakPartitionTable struct { ...@@ -719,13 +732,6 @@ type TweakPartitionTable struct {
// XXX _answer = Error // XXX _answer = Error
} }
/*
// Ask node information
type NodeInformation struct {
// XXX _answer = PFEmpty
}
*/
// Set the cluster state // Set the cluster state
type SetClusterState struct { type SetClusterState struct {
State ClusterState State ClusterState
...@@ -1026,5 +1032,5 @@ type AddObject struct { ...@@ -1026,5 +1032,5 @@ type AddObject struct {
type Truncate struct { type Truncate struct {
Tid zodb.Tid Tid zodb.Tid
// XXX _answer = Error ? // XXX _answer = Error
} }
...@@ -64,9 +64,12 @@ func u64(v uint64) string { ...@@ -64,9 +64,12 @@ func u64(v uint64) string {
} }
func TestPktHeader(t *testing.T) { func TestPktHeader(t *testing.T) {
// make sure PktHeader is really packed // make sure PktHeader is really packed and its size matches pktHeaderLen
if unsafe.Sizeof(PktHead{}) != 10 { if unsafe.Sizeof(PktHeader{}) != 10 {
t.Fatalf("sizeof(PktHead) = %v ; want 10", unsafe.Sizeof(PktHead{})) t.Fatalf("sizeof(PktHeader) = %v ; want 10", unsafe.Sizeof(PktHeader{}))
}
if unsafe.Sizeof(PktHeader{}) != pktHeaderLen {
t.Fatalf("sizeof(PktHeader) = %v ; want %v", unsafe.Sizeof(PktHeader{}), pktHeaderLen)
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment