Commit 2bc466f3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 146534b0
...@@ -382,6 +382,8 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo ...@@ -382,6 +382,8 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
if err != nil { if err != nil {
return nil, 0, err // XXX err ctx return nil, 0, err // XXX err ctx
} }
// FIXME ^^^ slink.CloseAccept after really dialed (not to deadlock if
// S decides to send us something)
req := neo.GetObject{Oid: xid.Oid} req := neo.GetObject{Oid: xid.Oid}
if xid.TidBefore { if xid.TidBefore {
......
...@@ -27,7 +27,7 @@ import ( ...@@ -27,7 +27,7 @@ import (
// XXX place=? -> methods of Error // XXX place=? -> methods of Error
// errEncode translates an error into Error packet // ErrEncode translates an error into Error packet.
// XXX more text describing relation with zodb errors // XXX more text describing relation with zodb errors
func ErrEncode(err error) *Error { func ErrEncode(err error) *Error {
switch err := err.(type) { switch err := err.(type) {
...@@ -43,7 +43,7 @@ func ErrEncode(err error) *Error { ...@@ -43,7 +43,7 @@ func ErrEncode(err error) *Error {
} }
// errDecode decodes error from Error packet // ErrDecode decodes error from Error packet.
// XXX more text describing relation with zodb errors // XXX more text describing relation with zodb errors
func ErrDecode(e *Error) error { func ErrDecode(e *Error) error {
switch e.Code { switch e.Code {
......
...@@ -26,6 +26,7 @@ package server ...@@ -26,6 +26,7 @@ package server
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/sha1"
//"io" //"io"
"math" "math"
"net" "net"
...@@ -421,7 +422,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -421,7 +422,7 @@ func TestMasterStorage(t *testing.T) {
xwait(wg) xwait(wg)
// C starts loading first object -> connects to S // C starts loading first object ...
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
xid1 := zodb.Xid{Oid: 1, XTid: zodb.XTid{Tid: zodb.TidMax, TidBefore: true}} xid1 := zodb.Xid{Oid: 1, XTid: zodb.XTid{Tid: zodb.TidMax, TidBefore: true}}
data1, serial1, err := zstor.Load(bg, xid1) data1, serial1, err := zstor.Load(bg, xid1)
...@@ -436,6 +437,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -436,6 +437,7 @@ func TestMasterStorage(t *testing.T) {
} }
}) })
// ... -> connects to S
tc.Expect(netconnect("c:2", "s:3", "s:1")) tc.Expect(netconnect("c:2", "s:3", "s:1"))
tc.Expect(conntx("c:2", "s:3", 1, &neo.RequestIdentification{ tc.Expect(conntx("c:2", "s:3", 1, &neo.RequestIdentification{
NodeType: neo.CLIENT, NodeType: neo.CLIENT,
...@@ -444,7 +446,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -444,7 +446,6 @@ func TestMasterStorage(t *testing.T) {
ClusterName: "abc1", ClusterName: "abc1",
IdTimestamp: 0, // XXX ? IdTimestamp: 0, // XXX ?
})) }))
println("222")
tc.Expect(conntx("s:3", "c:2", 1, &neo.AcceptIdentification{ tc.Expect(conntx("s:3", "c:2", 1, &neo.AcceptIdentification{
NodeType: neo.STORAGE, NodeType: neo.STORAGE,
...@@ -454,8 +455,23 @@ func TestMasterStorage(t *testing.T) { ...@@ -454,8 +455,23 @@ func TestMasterStorage(t *testing.T) {
YourUUID: neo.UUID(neo.CLIENT, 1), YourUUID: neo.UUID(neo.CLIENT, 1),
})) }))
println("333") // ... -> GetObject(xid1)
tc.Expect(conntx("c:2", "s:3", 3, &neo.GetObject{
Oid: xid1.Oid,
Tid: xid1.Tid,
Serial: neo.INVALID_TID,
}))
tc.Expect(conntx("s:3", "c:2", 3, &neo.AnswerGetObject{
Oid: xid1.Oid,
Serial: serial1,
NextSerial: 0, // XXX
Compression: false,
Data: data1,
DataSerial: 0, // XXX
Checksum: sha1.Sum(data1),
}))
println("444")
......
...@@ -22,6 +22,7 @@ package server ...@@ -22,6 +22,7 @@ package server
import ( import (
"context" "context"
"crypto/sha1"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -111,7 +112,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -111,7 +112,7 @@ func (stor *Storage) Run(ctx context.Context) error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err? stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged
}() }()
...@@ -394,7 +395,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *neo ...@@ -394,7 +395,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *neo
defer task.Runningf(&ctx, "serve %s", link)(&err) defer task.Runningf(&ctx, "serve %s", link)(&err)
defer xio.CloseWhenDone(ctx, link)() defer xio.CloseWhenDone(ctx, link)()
// handle identification // first process identification
idResp, ok := stor.identify(idReq) idResp, ok := stor.identify(idReq)
if !ok { if !ok {
reject(ctx, req, idResp) // XXX log? reject(ctx, req, idResp) // XXX log?
...@@ -449,13 +450,10 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) { ...@@ -449,13 +450,10 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
return return
} }
//lclose(ctx, conn)
// keep on going in the same goroutine to avoid goroutine creation overhead // keep on going in the same goroutine to avoid goroutine creation overhead
// TODO += timeout -> go away if inactive // TODO += timeout -> go away if inactive
req, err = link.Recv1() req, err = link.Recv1()
if err != nil { if err != nil {
// lclose(link) XXX ?
log.Error(ctx, err) log.Error(ctx, err)
return return
} }
...@@ -477,7 +475,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -477,7 +475,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
data, tid, err := stor.zstor.Load(ctx, xid) data, tid, err := stor.zstor.Load(ctx, xid)
if err != nil { if err != nil {
// TODO translate err to NEO protocol error codes // translate err to NEO protocol error codes
return neo.ErrEncode(err) return neo.ErrEncode(err)
} }
...@@ -485,9 +483,9 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -485,9 +483,9 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
Oid: xid.Oid, Oid: xid.Oid,
Serial: tid, Serial: tid,
Compression: false, Compression: false,
Data: data, Data: data,
// XXX .CheckSum Checksum: sha1.Sum(data), // XXX computing every time
// XXX .NextSerial // XXX .NextSerial
// XXX .DataSerial // XXX .DataSerial
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment