Commit 379cc163 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 59333e47
......@@ -403,6 +403,7 @@ func (c *Conn) errRecvShutdown() error {
// recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) {
select {
// XXX maybe possible to detect "down" by seeing c.rxq is closed?
case <-c.down:
return nil, c.err("recv", c.errRecvShutdown())
......
......@@ -250,7 +250,7 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(netconnect("s:2", "m:2", "m:1"))
tc.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{
NodeType: neo.STORAGE,
NodeUUID: 0,
UUID: 0,
Address: xnaddr("s:1"),
ClusterName: "abc1",
IdTimestamp: 0,
......@@ -260,10 +260,10 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER,
MyNodeUUID: neo.UUID(neo.MASTER, 1),
MyUUID: neo.UUID(neo.MASTER, 1),
NumPartitions: 1,
NumReplicas: 1,
YourNodeUUID: neo.UUID(neo.STORAGE, 1),
YourUUID: neo.UUID(neo.STORAGE, 1),
}))
// TODO test ID rejects (uuid already registered, ...)
......@@ -313,8 +313,8 @@ func TestMasterStorage(t *testing.T) {
TidDict: nil, // map[zodb.Tid]zodb.Tid{},
}))
lastOid, err1 := zstor.LastOid()
lastTid, err2 := zstor.LastTid()
lastOid, err1 := zstor.LastOid(context.TODO())
lastTid, err2 := zstor.LastTid(context.TODO())
exc.Raiseif(xerr.Merge(err1, err2))
tc.Expect(conntx("m:2", "s:2", 1, &neo.LastIDs{}))
tc.Expect(conntx("s:2", "m:2", 1, &neo.AnswerLastIDs{
......
......@@ -47,21 +47,7 @@ type Server interface {
// the listener is closed when Serve returns.
func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
fmt.Printf("xxx: serving on %s ...\n", l.Addr()) // XXX 'xxx' -> ?
// close listener when either cancelling or returning (e.g. due to an error)
// ( when cancelling - listener close will signal to all accepts to
// terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
// XXX err = cancelled
case <-retch:
}
l.Close() // XXX err
}()
defer xio.CloseWhenDone(ctx, l)()
// main Accept -> ServeLink loop
for {
......
......@@ -32,6 +32,7 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/go123/xerr"
)
......@@ -410,26 +411,9 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
// --- serve incoming connections from other nodes ---
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
log.Infof(ctx, "%s: serving new node", link) // XXX -> running?
// close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
// terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
// XXX tell peers we are shutting down?
// XXX ret err = ctx.Err()
case <-retch:
}
log.Info(ctx, "%v: closing link", link)
link.Close() // XXX err
}()
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err error) {
defer task.Runningf(&ctx, "serve %s", link)(&err)
defer xio.CloseWhenDone(ctx, link)()
// XXX only accept clients
// XXX only accept when operational (?)
......@@ -463,6 +447,8 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
}
// TODO wait all spawned serveConn
return nil
}
// withWhileOperational derives new context from ctx which will be cancelled, when either
......
......@@ -37,7 +37,7 @@ import (
// call cancel as soon as the operations running in this Context complete.
//
// XXX let Merge do only merge, not create another cancel; optimize it for
// cases when a source context is not cancellable
// cases when a source context is not cancellable
func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
mc := &mergeCtx{
ctx1: ctx1,
......@@ -170,3 +170,29 @@ func Canceled(err error) bool {
return false
}
// WhenDone arranges f to be called either when ctx is cancelled or surrounding
// function returns.
//
// To work as intended it should be called under defer like this:
//
// func myfunc(ctx, ...) {
// defer xcontext.WhenDone(ctx, func() { ... })()
func WhenDone(ctx context.Context, f func()) func() {
done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
// ok
case <-done:
// ok
}
f()
}()
return func() {
close(done)
}
}
......@@ -4,10 +4,14 @@
package xio
import (
"context"
"fmt"
"io"
"net"
"os"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
)
// XXX interface for a Reader/Writer which can report position
......@@ -82,3 +86,22 @@ func Name(f interface {}) string {
return fmt.Sprintf("%#v", f)
}
}
// CloseWhenDone arranges for c to be closed either when ctx is cancelled or
// surrounding function returns.
//
// To work as intended it should be called under defer like this:
//
// func myfunc(ctx, ...) {
// defer xio.CloseWhenDone(ctx, c)()
//
// The error - if c.Close() returns with any - is logged.
func CloseWhenDone(ctx context.Context, c io.Closer) func() {
return xcontext.WhenDone(ctx, func() {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
})
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment