Commit 63875e79 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 69a02de9
package proto
package neo
//package proto
/*
import (
. "../"
)
*/
const (
PROTOCOL_VERSION = 8
MIN_PACKET_SIZE = 10 // XXX link this to len(pkthead) ?
PktHeadLen = MIN_PACKET_SIZE // TODO link this to PktHead.Encode/Decode size
MAX_PACKET_SIZE = 0x4000000
RESPONSE_MASK = 0x800
......@@ -122,8 +126,7 @@ type RowList []struct {
// XXX link request <-> answer ?
// TODO ensure len(encoded packet header) == 10
// XXX -> PktHeader ?
type Packet struct {
type PktHead struct {
Id uint32
Code uint16 // XXX we don't need this as field - this is already encoded in type
Len uint32 // XXX we don't need this as field - only on the wire
......@@ -133,7 +136,7 @@ type Packet struct {
// General purpose notification (remote logging)
type Notify struct {
Packet
PktHead
Message string
}
......@@ -141,26 +144,26 @@ type Notify struct {
// any other message, even if such a message does not expect a reply
// usually. Any -> Any.
type Error struct {
Packet
PktHead
Code uint32 // PNumber
Message string
}
// Check if a peer is still alive. Any -> Any.
type Ping struct {
Packet
PktHead
// TODO _answer = PFEmpty
}
// Tell peer it can close the connection if it has finished with us. Any -> Any
type CloseClient struct {
Packet
PktHead
}
// Request a node identification. This must be the first packet for any
// connection. Any -> Any.
type RequestIdentification struct {
Packet
PktHead
ProtocolVersion uint32 // TODO py.PProtocol upon decoding checks for != PROTOCOL_VERSION
NodeType NodeType // XXX name
UUID UUID
......@@ -171,7 +174,7 @@ type RequestIdentification struct {
// XXX -> ReplyIdentification? RequestIdentification.Answer somehow ?
type AcceptIdentification struct {
Packet
PktHead
NodeType NodeType // XXX name
MyUUID UUID
NumPartitions uint32 // PNumber
......@@ -186,31 +189,31 @@ type AcceptIdentification struct {
// Ask current primary master's uuid. CTL -> A.
type PrimaryMaster struct {
Packet
PktHead
}
type AnswerPrimary struct {
Packet
PktHead
PrimaryUUID UUID
}
// Announce a primary master node election. PM -> SM.
type AnnouncePrimary struct {
Packet
PktHead
}
// Force a re-election of a primary master node. M -> M.
type ReelectPrimary struct {
Packet
PktHead
}
// Ask all data needed by master to recover. PM -> S, S -> PM.
type Recovery struct {
Packet
PktHead
}
type AnswerRecovery struct {
Packet
PktHead
PTid
BackupTID Tid
TruncateTID Tid
......@@ -219,11 +222,11 @@ type AnswerRecovery struct {
// Ask the last OID/TID so that a master can initialize its TransactionManager.
// PM -> S, S -> PM.
type LastIDs struct {
Packet
PktHead
}
type AnswerLastIDs struct {
Packet
PktHead
LastOID Oid
LastTID Tid
}
......@@ -231,11 +234,11 @@ type AnswerLastIDs struct {
// Ask the full partition table. PM -> S.
// Answer rows in a partition table. S -> PM.
type PartitionTable struct {
Packet
PktHead
}
type AnswerPartitionTable struct {
Packet
PktHead
PTid
RowList RowList
}
......@@ -243,7 +246,7 @@ type AnswerPartitionTable struct {
// Send rows in a partition table to update other nodes. PM -> S, C.
type NotifyPartitionTable struct {
Packet
PktHead
PTid
RowList RowList
}
......@@ -251,7 +254,7 @@ type NotifyPartitionTable struct {
// Notify a subset of a partition table. This is used to notify changes.
// PM -> S, C.
type PartitionChanges struct {
Packet
PktHead
PTid
CellList []struct {
......@@ -265,7 +268,7 @@ type PartitionChanges struct {
// Tell a storage nodes to start an operation. Until a storage node receives
// this message, it must not serve client nodes. PM -> S.
type StartOperation struct {
Packet
PktHead
// XXX: Is this boolean needed ? Maybe this
// can be deduced from cluster state.
Backup bool
......@@ -274,17 +277,17 @@ type StartOperation struct {
// Tell a storage node to stop an operation. Once a storage node receives
// this message, it must not serve client nodes. PM -> S.
type StopOperation struct {
Packet
PktHead
}
// Ask unfinished transactions S -> PM.
// Answer unfinished transactions PM -> S.
type UnfinishedTransactions struct {
Packet
PktHead
}
type AnswerUnfinishedTransactions struct {
Packet
PktHead
MaxTID Tid
TidList []struct{
UnfinishedTID Tid
......@@ -294,28 +297,28 @@ type AnswerUnfinishedTransactions struct {
// Ask locked transactions PM -> S.
// Answer locked transactions S -> PM.
type LockedTransactions struct {
Packet
PktHead
}
type AnswerLockedTransactions struct {
Packet
PktHead
TidDict map[Tid]Tid // ttid -> tid
}
// Return final tid if ttid has been committed. * -> S. C -> PM.
type FinalTID struct {
Packet
PktHead
TTID Tid
}
type AnswerFinalTID struct {
Packet
PktHead
Tid Tid
}
// Commit a transaction. PM -> S.
type ValidateTransaction struct {
Packet
PktHead
TTID Tid
Tid Tid
}
......@@ -324,26 +327,26 @@ type ValidateTransaction struct {
// Ask to begin a new transaction. C -> PM.
// Answer when a transaction begin, give a TID if necessary. PM -> C.
type BeginTransaction struct {
Packet
PktHead
Tid Tid
}
type AnswerBeginTransaction struct {
Packet
PktHead
Tid Tid
}
// Finish a transaction. C -> PM.
// Answer when a transaction is finished. PM -> C.
type FinishTransaction struct {
Packet
PktHead
Tid Tid
OIDList []Oid
CheckedList []Oid
}
type AnswerFinishTransaction struct {
Packet
PktHead
TTID Tid
Tid Tid
}
......@@ -351,7 +354,7 @@ type AnswerFinishTransaction struct {
// Notify that a transaction blocking a replication is now finished
// M -> S
type NotifyTransactionFinished struct {
Packet
PktHead
TTID Tid
MaxTID Tid
}
......@@ -359,7 +362,7 @@ type NotifyTransactionFinished struct {
// Lock information on a transaction. PM -> S.
// Notify information on a transaction locked. S -> PM.
type LockInformation struct {
Packet
PktHead
Ttid Tid
Tid Tid
}
......@@ -372,27 +375,27 @@ type AnswerLockInformation struct {
// Invalidate objects. PM -> C.
// XXX ask_finish_transaction ?
type InvalidateObjects struct {
Packet
PktHead
Tid Tid
OidList []Oid
}
// Unlock information on a transaction. PM -> S.
type UnlockInformation struct {
Packet
PktHead
TTID Tid
}
// Ask new object IDs. C -> PM.
// Answer new object IDs. PM -> C.
type GenerateOIDs struct {
Packet
PktHead
NumOIDs uint32 // PNumber
}
// XXX answer_new_oids ?
type AnswerGenerateOIDs struct {
Packet
PktHead
OidList []Oid
}
......@@ -404,7 +407,7 @@ type AnswerGenerateOIDs struct {
// if this serial is newer than the current transaction ID, a client
// node must not try to resolve the conflict. S -> C.
type StoreObject struct {
Packet
PktHead
Oid Oid
Serial Tid
Compression bool
......@@ -416,7 +419,7 @@ type StoreObject struct {
}
type AnswerStoreObject struct {
Packet
PktHead
Conflicting bool
Oid Oid
Serial Tid
......@@ -424,14 +427,14 @@ type AnswerStoreObject struct {
// Abort a transaction. C -> S, PM.
type AbortTransaction struct {
Packet
PktHead
Tid Tid
}
// Ask to store a transaction. C -> S.
// Answer if transaction has been stored. S -> C.
type StoreTransaction struct {
Packet
PktHead
Tid Tid
User string
Description string
......@@ -443,7 +446,7 @@ type StoreTransaction struct {
// Ask to store a transaction. C -> S.
// Answer if transaction has been stored. S -> C.
type VoteTransaction struct {
Packet
PktHead
Tid Tid
// TODO _answer = PFEmpty
}
......@@ -453,7 +456,7 @@ type VoteTransaction struct {
// a TID is specified, an object right before the TID will be returned. C -> S.
// Answer the requested object. S -> C.
type GetObject struct {
Packet
PktHead
Oid Oid
Serial Tid
Tid Tid
......@@ -461,7 +464,7 @@ type GetObject struct {
// XXX answer_object ?
type AnswerGetObject struct {
Packet
PktHead
Oid Oid
SerialStart Tid
SerialEnd Tid
......@@ -475,7 +478,7 @@ type AnswerGetObject struct {
// and the range is [first, last). C -> S.
// Answer the requested TIDs. S -> C.
type TIDList struct {
Packet
PktHead
First uint64 // PIndex XXX this is TID actually ? -> no it is offset in list
Last uint64 // PIndex ----//----
Partition uint32 // PNumber
......@@ -483,7 +486,7 @@ type TIDList struct {
// XXX answer_tids ?
type AnswerTIDList struct {
Packet
PktHead
TIDList []Tid
}
......@@ -491,7 +494,7 @@ type AnswerTIDList struct {
// C -> S.
// Answer the requested TIDs. S -> C
type TIDListFrom struct {
Packet
PktHead
MinTID Tid
MaxTID Tid
Length uint32 // PNumber
......@@ -500,19 +503,19 @@ type TIDListFrom struct {
// XXX answer_tids ?
type AnswerTIDListFrom struct {
Packet
PktHead
TidList []Tid
}
// Ask information about a transaction. Any -> S.
// Answer information (user, description) about a transaction. S -> Any.
type TransactionInformation struct {
Packet
PktHead
Tid Tid
}
type AnswerTransactionInformation struct {
Packet
PktHead
Tid Tid
User string
Description string
......@@ -525,14 +528,14 @@ type AnswerTransactionInformation struct {
// descending, and the range is [first, last]. C -> S.
// Answer history information (serial, size) for an object. S -> C.
type ObjectHistory struct {
Packet
PktHead
Oid Oid
First uint64 // PIndex XXX this is actually TID
Last uint64 // PIndex ----//----
}
type AnswerObjectHistory struct {
Packet
PktHead
Oid Oid
HistoryList []struct {
Serial Tid
......@@ -544,14 +547,14 @@ type AnswerObjectHistory struct {
// Ask information about partition
// Answer information about partition
type PartitionList struct {
Packet
PktHead
MinOffset uint32 // PNumber
MaxOffset uint32 // PNumber
UUID UUID
}
type AnswerPartitionList struct {
Packet
PktHead
PTid
RowList RowList
}
......@@ -559,18 +562,18 @@ type AnswerPartitionList struct {
// Ask information about nodes
// Answer information about nodes
type X_NodeList struct {
Packet
PktHead
NodeType
}
type AnswerNodeList struct {
Packet
PktHead
NodeList []NodeInfo
}
// Set the node state
type SetNodeState struct {
Packet
PktHead
UUID
NodeState
......@@ -579,7 +582,7 @@ type SetNodeState struct {
// Ask the primary to include some pending node in the partition table
type AddPendingNodes struct {
Packet
PktHead
UUIDList []UUID
// XXX _answer = Error
......@@ -587,7 +590,7 @@ type AddPendingNodes struct {
// Ask the primary to optimize the partition table. A -> PM.
type TweakPartitionTable struct {
Packet
PktHead
UUIDList []UUID
// XXX _answer = Error
......@@ -595,19 +598,19 @@ type TweakPartitionTable struct {
// Notify information about one or more nodes. PM -> Any.
type NotifyNodeInformation struct {
Packet
PktHead
NodeList []NodeInfo
}
// Ask node information
type NodeInformation struct {
Packet
PktHead
// XXX _answer = PFEmpty
}
// Set the cluster state
type SetClusterState struct {
Packet
PktHead
State ClusterState
// XXX _answer = Error
......@@ -615,14 +618,14 @@ type SetClusterState struct {
// Notify information about the cluster
type ClusterInformation struct {
Packet
PktHead
State ClusterState
}
// Ask state of the cluster
// Answer state of the cluster
type X_ClusterState struct { // XXX conflicts with ClusterState enum
Packet
PktHead
State ClusterState
}
......@@ -642,7 +645,7 @@ type X_ClusterState struct { // XXX conflicts with ClusterState enum
// If current_serial's data is current on storage.
// S -> C
type ObjectUndoSerial struct {
Packet
PktHead
Tid Tid
LTID Tid
UndoneTID Tid
......@@ -651,7 +654,7 @@ type ObjectUndoSerial struct {
// XXX answer_undo_transaction ?
type AnswerObjectUndoSerial struct {
Packet
PktHead
ObjectTIDDict map[Oid]struct {
CurrentSerial Tid
UndoSerial Tid
......@@ -663,13 +666,13 @@ type AnswerObjectUndoSerial struct {
// C -> S
// Answer whether a transaction holds the write lock for requested object.
type HasLock struct {
Packet
PktHead
Tid Tid
Oid Oid
}
type AnswerHasLock struct {
Packet
PktHead
Oid Oid
LockState LockState
}
......@@ -682,7 +685,7 @@ type AnswerHasLock struct {
// Same structure as AnswerStoreObject, to handle the same way, except there
// is nothing to invalidate in any client's cache.
type CheckCurrentSerial struct {
Packet
PktHead
Tid Tid
Serial Tid
Oid Oid
......@@ -702,12 +705,12 @@ type AnswerCheckCurrentSerial struct {
// S -> M
// M -> C
type Pack struct {
Packet
PktHead
Tid Tid
}
type AnswerPack struct {
Packet
PktHead
Status bool
}
......@@ -715,7 +718,7 @@ type AnswerPack struct {
// ctl -> A
// A -> M
type CheckReplicas struct {
Packet
PktHead
PartitionDict map[uint32/*PNumber*/]UUID // partition -> source
MinTID Tid
MaxTID Tid
......@@ -725,7 +728,7 @@ type CheckReplicas struct {
// M -> S
type CheckPartition struct {
Packet
PktHead
Partition uint32 // PNumber
Source struct {
UpstreamName string
......@@ -745,7 +748,7 @@ type CheckPartition struct {
// reference node.
// S -> S
type CheckTIDRange struct {
Packet
PktHead
Partition uint32 // PNumber
Length uint32 // PNumber
MinTID Tid
......@@ -753,7 +756,7 @@ type CheckTIDRange struct {
}
type AnswerCheckTIDRange struct {
Packet
PktHead
Count uint32 // PNumber
Checksum Checksum
MaxTID Tid
......@@ -768,7 +771,7 @@ type AnswerCheckTIDRange struct {
// reference node.
// S -> S
type CheckSerialRange struct {
Packet
PktHead
Partition uint32 // PNumber
Length uint32 // PNumber
MinTID Tid
......@@ -786,7 +789,7 @@ type AnswerCheckSerialRange struct {
// S -> M
type PartitionCorrupted struct {
Packet
PktHead
Partition uint32 // PNumber
CellList []UUID
}
......@@ -797,11 +800,11 @@ type PartitionCorrupted struct {
// Answer last committed TID.
// M -> C
type LastTransaction struct {
Packet
PktHead
}
type AnswerLastTransaction struct {
Packet
PktHead
Tid Tid
}
......@@ -809,7 +812,7 @@ type AnswerLastTransaction struct {
// Notify that node is ready to serve requests.
// S -> M
type NotifyReady struct {
Packet
PktHead
}
// replication
......
......@@ -4,8 +4,12 @@ package neo
import (
"context"
"encoding/binary"
"net"
"fmt"
"io"
//"../neo/proto"
)
// NEO Storage application
......@@ -14,46 +18,56 @@ import (
type StorageApplication struct {
}
/*
// XXX change to bytes.Buffer if we need to access it as I/O
type Buffer struct {
buf []byte
}
*/
func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) {
fmt.Printf("stor: serving new client %s <-> %s\n", conn.LocalAddr(), conn.RemoteAddr())
//fmt.Fprintf(conn, "Hello up there, you address is %s\n", conn.RemoteAddr()) // XXX err
//conn.Close() // XXX err
/*
// TODO: use bytes.Buffer{}
// .Bytes() -> buf -> can grow up again up to buf[:cap(buf)]
// NewBuffer(buf) -> can use same buffer for later reading via bytes.Buffer
// TODO read PktHeader (fixed length) (-> length, PktType (by .code))
// TODO PktHeader
//rxbuf := bytes.Buffer{}
rxbuf := bytes.NewBuffer(make([]byte, 4096))
n, err := conn.Read(rxbuf.Bytes())
*/
// first read to read pkt header and hopefully up to page of data in 1 syscall
rxbuf := Buffer{make([]byte, 4096}
rxbuf := make([]byte, 4096)
n, err := io.ReadAtLeast(conn, rxbuf, PktHeadLen)
if err != nil {
panic(err) // TODO
panic(err) // XXX err
}
id := binary.BigEndian.Uint32(rxbuf[0:])
code := binary.BigEndian.Uint16(rxbuf[4:])
_/*id*/ = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
_/*code*/= binary.BigEndian.Uint16(rxbuf[4:])
length := binary.BigEndian.Uint32(rxbuf[6:])
if length < PktHeadLen {
panic() // XXX err (length is a whole packet len with header)
panic("TODO pkt.length < PktHeadLen") // XXX err (length is a whole packet len with header)
}
if length > len(rxbuf) {
// TODO grow rxbuf
panic()
if length > uint32(len(rxbuf)) {
// grow rxbuf
rxbuf2 := make([]byte, length)
copy(rxbuf2, rxbuf[:n])
rxbuf = rxbuf2
}
// read rest of pkt data, if we need to
n2, err := io.ReadFull(conn, rxbuf[n:length])
// read first pkt chunk: header + some data (all in 1 read call)
rxl.N = 4096
n, err := rxbuf.ReadFrom(rxl)
_, err = io.ReadFull(conn, rxbuf[n:length])
if err != nil {
panic(err) // XXX err
}
}
......@@ -72,7 +86,6 @@ type Server interface {
// - for every accepted connection spawn srv.ServeConn() in separate goroutine.
//
// the listener is closed when Serve returns.
// XXX text
// XXX move -> generic place ?
func Serve(ctx context.Context, l net.Listener, srv Server) error {
fmt.Printf("stor: serving on %s ...\n", l.Addr())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment