Commit c5278b55 authored by Kirill Smelkov's avatar Kirill Smelkov

X sync proto.go to recent py changes; Pkt.Len now describes only payload len (see a60e36e8)

TODO handshake for NodeLink to first check protocol versions
parent f873e151
......@@ -502,7 +502,6 @@ var ErrPktTooBig = errors.New("packet too big")
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...)
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)}
......@@ -516,22 +515,20 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen {
return nil, ErrPktTooSmall // length is a whole packet len with header
}
if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
pktLen := PktHeadLen + ntoh32(pkth.MsgLen) // .MsgLen is payload-only length without header
if pktLen > MAX_PACKET_SIZE {
return nil, ErrPktTooBig
}
// XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], ntoh32(pkth.Len))
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], pktLen)
if pktLen > uint32(cap(pkt.Data)) {
// grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len))
rxbuf2 := make([]byte, pktLen)
copy(rxbuf2, pkt.Data[:n])
pkt.Data = rxbuf2
}
// cut .Data len to length of packet
pkt.Data = pkt.Data[:ntoh32(pkth.Len)]
pkt.Data = pkt.Data[:pktLen]
// read rest of pkt data, if we need to
if n < len(pkt.Data) {
......
......@@ -105,7 +105,7 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
h := pkt.Header()
h.ConnId = hton32(connid)
h.MsgCode = hton16(msgcode)
h.Len = hton32(PktHeadLen + uint32(len(payload)))
h.MsgLen = hton32(uint32(len(payload)))
copy(pkt.Payload(), payload)
return pkt
}
......@@ -126,8 +126,8 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
if ntoh16(h.MsgCode) != msgcode {
errv.Appendf("header: unexpected msgcode %v (want %v)", ntoh16(h.MsgCode), msgcode)
}
if ntoh32(h.Len) != uint32(PktHeadLen + len(payload)) {
errv.Appendf("header: unexpected length %v (want %v)", ntoh32(h.Len), PktHeadLen + len(payload))
if ntoh32(h.MsgLen) != uint32(len(payload)) {
errv.Appendf("header: unexpected msglen %v (want %v)", ntoh32(h.MsgLen), len(payload))
}
if !bytes.Equal(pkt.Payload(), payload) {
errv.Appendf("payload differ")
......
......@@ -29,8 +29,8 @@ type PktBuf struct {
// XXX naming -> PktHeader ?
type PktHead struct {
ConnId be32 // NOTE is .msgid in py
MsgCode be16
Len be32 // whole packet length (including header)
MsgCode be16 // payload message code
MsgLen be32 // payload message length (excluding packet header)
}
// Get pointer to packet header
......@@ -62,7 +62,7 @@ func (pkt *PktBuf) String() string {
s += fmt.Sprintf(" %s", msgType)
}
s += fmt.Sprintf(" #%d | ", ntoh32(h.Len))
s += fmt.Sprintf(" #%d | ", ntoh32(h.MsgLen))
s += fmt.Sprintf("% x", pkt.Payload()) // XXX better decode
return s
......
This diff is collapsed.
......@@ -4,9 +4,9 @@ package neo
import "fmt"
const _ErrorCode_name = "ACKNOT_READYOID_NOT_FOUNDTID_NOT_FOUNDOID_DOES_NOT_EXISTPROTOCOL_ERRORBROKEN_NODEREPLICATION_ERRORCHECKING_ERRORBACKEND_NOT_IMPLEMENTEDNON_READABLE_CELLREAD_ONLY_ACCESSINCOMPLETE_TRANSACTION"
const _ErrorCode_name = "ACKNOT_READYOID_NOT_FOUNDTID_NOT_FOUNDOID_DOES_NOT_EXISTPROTOCOL_ERRORREPLICATION_ERRORCHECKING_ERRORBACKEND_NOT_IMPLEMENTEDNON_READABLE_CELLREAD_ONLY_ACCESSINCOMPLETE_TRANSACTION"
var _ErrorCode_index = [...]uint8{0, 3, 12, 25, 38, 56, 70, 81, 98, 112, 135, 152, 168, 190}
var _ErrorCode_index = [...]uint8{0, 3, 12, 25, 38, 56, 70, 87, 101, 124, 141, 157, 179}
func (i ErrorCode) String() string {
if i >= ErrorCode(len(_ErrorCode_index)-1) {
......
......@@ -19,10 +19,15 @@ import (
)
const (
PROTOCOL_VERSION = 12
// The protocol version must be increased whenever upgrading a node may require
// to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
// the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 1
// XXX ENCODED_VERSION ?
PktHeadLen = 10 // XXX unsafe.Sizeof(PktHead{}) give _typed_ constant (uintptr)
// TODO link this to PktHead.Encode/Decode size ? XXX -> pkt.go ?
MIN_PACKET_SIZE = 10 // XXX unsafe.Sizeof(PktHead{}) give _typed_ constant (uintptr)
PktHeadLen = MIN_PACKET_SIZE // TODO link this to PktHead.Encode/Decode size ? XXX -> pkt.go ?
MAX_PACKET_SIZE = 0x4000000
RESPONSE_MASK = 0x8000
......@@ -37,7 +42,6 @@ const (
TID_NOT_FOUND
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
BROKEN_NODE
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
......@@ -89,13 +93,10 @@ const (
type NodeState int32
const (
RUNNING NodeState = iota //short: R // XXX tag prefix name ?
TEMPORARILY_DOWN //short: T
UNKNOWN NodeState = iota //short: U // XXX tag prefix name ?
DOWN //short: D
BROKEN //short: B
HIDDEN //short: H
RUNNING //short: R
PENDING //short: P
UNKNOWN //short: U
)
type CellState int32
......@@ -251,11 +252,6 @@ type RowInfo struct {
// General purpose notification (remote logging)
type Notify struct {
Message string
}
// Error is a special type of message, because this can be sent against
// any other message, even if such a message does not expect a reply
// usually. Any -> Any.
......@@ -277,7 +273,6 @@ type CloseClient struct {
// Request a node identification. This must be the first packet for any
// connection. Any -> Any.
type RequestIdentification struct {
ProtocolVersion uint32 // TODO py.PProtocol upon decoding checks for != PROTOCOL_VERSION
NodeType NodeType // XXX name
NodeID NodeID
Address Address // where requesting node is also accepting connections
......@@ -292,11 +287,6 @@ type AcceptIdentification struct {
NumPartitions uint32 // PNumber
NumReplicas uint32 // PNumber
YourNodeID NodeID
Primary Address
KnownMasterList []struct {
Address
NodeID NodeID
}
}
// Ask current primary master's uuid. CTL -> A.
......@@ -307,12 +297,12 @@ type AnswerPrimary struct {
PrimaryNodeID NodeID
}
// Announce a primary master node election. PM -> SM.
type AnnouncePrimary struct {
}
// Force a re-election of a primary master node. M -> M.
type ReelectPrimary struct {
// Send list of known master nodes. SM -> Any.
type NotPrimaryMaster struct {
Primary NodeID // XXX PSignedNull in py
KnownMasterList []struct {
Address
}
}
// Ask all data needed by master to recover. PM -> S, S -> PM.
......
......@@ -214,13 +214,12 @@ func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interf
// EncodeAndSend encodes pkt and send it to conn
func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
msgCode, l := pkt.NEOEncodedInfo()
l += PktHeadLen
buf := PktBuf{make([]byte, l)} // XXX -> freelist
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(msgCode)
h.Len = hton32(uint32(l)) // XXX casting: think again
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
pkt.NEOEncode(buf.Payload())
......
......@@ -113,7 +113,7 @@ func errEncode(err error) *Error {
return &Error{Code: OID_NOT_FOUND, Message: err.Xid.String()}
default:
return &Error{Code: BROKEN_NODE, Message: err.Error()}
return &Error{Code: NOT_READY /* XXX how to report 503? was BROKEN_NODE */, Message: err.Error()}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment