Commit e4023e90 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 632042db
......@@ -1919,8 +1919,8 @@ func (p *AnswerGetObject) NEOEncodedLen() int {
func (p *AnswerGetObject) NEOEncode(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.Oid))
binary.BigEndian.PutUint64(data[8:], uint64(p.SerialStart))
binary.BigEndian.PutUint64(data[16:], uint64(p.SerialEnd))
binary.BigEndian.PutUint64(data[8:], uint64(p.Serial))
binary.BigEndian.PutUint64(data[16:], uint64(p.NextSerial))
(data[24:])[0] = bool2byte(p.Compression)
copy(data[25:], p.Checksum[:])
{
......@@ -1939,8 +1939,8 @@ func (p *AnswerGetObject) NEODecode(data []byte) (int, error) {
goto overflow
}
p.Oid = zodb.Oid(binary.BigEndian.Uint64(data[0:]))
p.SerialStart = zodb.Tid(binary.BigEndian.Uint64(data[8:]))
p.SerialEnd = zodb.Tid(binary.BigEndian.Uint64(data[16:]))
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[8:]))
p.NextSerial = zodb.Tid(binary.BigEndian.Uint64(data[16:]))
p.Compression = byte2bool((data[24:])[0])
copy(p.Checksum[:], data[25:45])
{
......
......@@ -218,6 +218,7 @@ type Notify struct {
// usually. Any -> Any.
type Error struct {
Code uint32 // PNumber
//Code ErrorCode // PNumber
Message string
}
......@@ -537,8 +538,8 @@ type GetObject struct {
// XXX answer_object ?
type AnswerGetObject struct {
Oid zodb.Oid
SerialStart zodb.Tid
SerialEnd zodb.Tid
Serial zodb.Tid // XXX strictly is SerialStart/SerialEnd in proto.py
NextSerial zodb.Tid // XXX but there it is out of sync
Compression bool
Checksum Checksum
Data []byte // TODO separately (for writev)
......
......@@ -11,18 +11,22 @@
// See COPYING file for full licensing terms.
// TODO text
// XXX move to separate "storage" package ?
package neo
import (
"context"
"fmt"
"./zodb"
)
// NEO Storage application
// XXX naming
type StorageApplication struct {
type Storage struct {
zstor zodb.IStorage // underlying ZODB storage XXX temp ?
}
......@@ -33,7 +37,7 @@ type Buffer struct {
}
*/
func (stor *StorageApplication) ServeLink(ctx context.Context, link *NodeLink) {
func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("stor: serving new node %s <-> %s\n", link.peerLink.LocalAddr(), link.peerLink.RemoteAddr())
/*
......@@ -92,3 +96,81 @@ func (stor *StorageApplication) ServeLink(ctx context.Context, link *NodeLink) {
//recvPkt()
}
type StorageClientHandler struct {
stor *Storage
}
// XXX stub
// XXX move me out of here
func RecvAndDecode(conn *Conn) (interface{}, error) {
pkt, err := conn.Recv()
if err != nil {
return nil, err
}
// TODO decode pkt
return pkt, nil
}
func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
// TODO encode pkt
l := pkt.NEOEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
pkt.NEOEncode(buf.Payload())
return conn.Send(&buf) // XXX why pointer?
}
// XXX naming for RecvAndDecode and EncodeAndSend
func (ch *StorageClientHandler) ServeConn(ctx context.Context, conn *Conn) {
// TODO ctx.Done -> close conn
defer conn.Close() // XXX err
pkt, err := RecvAndDecode(conn)
if err != nil {
return // XXX log / err / send error before closing
}
switch pkt := pkt.(type) {
case *GetObject:
xid := zodb.Xid{Oid: pkt.Oid}
if pkt.Serial != INVALID_TID {
xid.Tid = pkt.Serial
xid.TidBefore = false
} else {
xid.Tid = pkt.Tid
xid.TidBefore = true
}
data, tid, err := ch.stor.zstor.Load(xid)
if err != nil {
// TODO translate err to NEO protocol error codes
errPkt := Error{Code: 0, Message: err.Error()}
EncodeAndSend(conn, &errPkt) // XXX err
} else {
answer := AnswerGetObject{
Oid: xid.Oid,
Serial: tid,
Compression: false,
Data: data,
// XXX .CheckSum
// XXX .NextSerial
// XXX .DataSerial
}
EncodeAndSend(conn, &answer) // XXX err
}
case *LastTransaction:
// ----//---- for zstor.LastTid()
case *ObjectHistory:
//case *StoreObject:
}
//pkt.Put(...)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment