Commit 7a98ef72 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9a4004fe
......@@ -18,6 +18,7 @@
// See https://www.nexedi.com/licensing for rationale and options.
package neo
// client node
// XXX old: Package client provides ZODB storage interface for accessing NEO cluster.
......@@ -43,20 +44,20 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
)
// Client talks to NEO cluster and exposes access to it via ZODB interfaces.
// Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct {
node *NodeApp
talkMasterCancel func()
// link to master - established and maintained by talkMaster.
// users retrieve it via masterLink.
// users retrieve it via masterLink().
mlinkMu sync.Mutex
mlink *neonet.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle
// operational state in node is maintained by recvMaster.
// users retrieve it via withOperational.
// users retrieve it via withOperational().
//
// NOTE being operational means:
// - link to master established and is ok
......@@ -237,7 +238,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
return err
}
// XXX vvv dup from Server.talkMaster1 ?
// FIXME vvv dup from Storage.talkMaster1
// XXX -> node.Dial ?
if accept.YourUUID != c.node.MyInfo.UUID {
......
......@@ -49,7 +49,7 @@ type NodeApp struct {
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of master XXX -> Address ?
MasterAddr string // address of current master XXX put under StateMu ?
StateMu sync.RWMutex // <- XXX just embed?
NodeTab *NodeTable // information about nodes in the cluster
......@@ -69,7 +69,7 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr,
}
app := &NodeApp{
MyInfo: proto.NodeInfo{Type: typ, Addr: addr, IdTime: proto.IdTimeNone},
MyInfo: proto.NodeInfo{Type: typ, Addr: addr, UUID: 0, IdTime: proto.IdTimeNone},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
......@@ -86,6 +86,7 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr,
// Dial connects to another node in the cluster.
//
// It handshakes, requests identification and checks peer type. If successful returned are:
//
// - established link
// - accept identification reply
//
......
......@@ -347,7 +347,7 @@ func (p *Node) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, app.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
err = fmt.Errorf("connected but TODO peer works with !1x1 partition table.")
}
if err != nil {
......
......@@ -32,7 +32,7 @@ import (
//
// It is
//
// Oid -> []Storage # XXX actually oid -> []uuid
// oid -> []uuid
//
// mapping associating object id with list of storage nodes on where data for
// this oid should be written-to/loaded-from. This mapping is organized as follows:
......
......@@ -239,6 +239,8 @@ const (
// order bit is really important and the 31 other bits could be random.
// Extra namespace information and non-randomness of 3 LOB help to read logs.
//
// 0 is invalid NodeUUID XXX correct?
//
// TODO -> back to 16-bytes randomly generated UUID
type NodeUUID int32
......
......@@ -67,6 +67,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
}
*/
/*
// FIXME kill vvv
// ----------------------------------------
......@@ -81,7 +82,7 @@ func IdentifyPeer(ctx context.Context, link *neonet.NodeLink, myNodeType proto.N
defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet
conn, err := link.Accept(/*ctx*/)
conn, err := link.Accept() //+ctx
if err != nil {
return nodeInfo, err
}
......@@ -117,6 +118,7 @@ func IdentifyPeer(ctx context.Context, link *neonet.NodeLink, myNodeType proto.N
return req, nil
}
*/
// ----------------------------------------
......
......@@ -183,7 +183,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) {
// it returns error describing why such cycle had to finish.
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX dup in Client.talkMaster1 ?
// FIXME dup in Client.talkMaster1
mlink, accept, err := stor.node.Dial(ctx, proto.MASTER, stor.node.MasterAddr)
if err != nil {
return err
......@@ -316,7 +316,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
// handling transaction commit (with master) and syncing data with other
// storage nodes (XXX correct?).
//
// it always returns with an error describing why serve has to be stopped -
// it always returns with an error describing why serve had to be stopped -
// either due to master commanding us to stop, or context cancel or some other
// error.
func (stor *Storage) m1serve(ctx context.Context, reqStart *neonet.Request) (err error) {
......@@ -420,7 +420,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
}
// serveLink serves incoming node-node link connection
// serveLink serves incoming node-node link connection.
func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *proto.RequestIdentification) (err error) {
link := req.Link()
defer task.Runningf(&ctx, "serve %s", link)(&err)
......@@ -526,7 +526,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
xid.At = before2At(req.Tid)
}
resp, err := stor.back.Load(ctx, xid)
obj, err := stor.back.Load(ctx, xid)
if err != nil {
// translate err to NEO protocol error codes
e := err.(*zodb.OpError) // XXX move this to ErrEncode?
......@@ -536,7 +536,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
// compatibility with py side:
// for loadSerial - check we have exact hit - else "nodata"
if req.Serial != proto.INVALID_TID {
if resp.Serial != req.Serial {
if obj.Serial != req.Serial {
return &proto.Error{
Code: proto.OID_NOT_FOUND,
Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.Serial),
......@@ -544,7 +544,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
}
}
return resp
return obj
case *proto.LastTransaction:
lastTid, err := stor.back.LastTid(ctx)
......
......@@ -162,13 +162,19 @@ func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
panic("TODO")
}
func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (*proto.AnswerObject, error) {
// XXX err ctx zodb.OpError{URL: b.url, Op: "load", Err: ...}
func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject, err error) {
defer func() {
if err != nil {
err = &zodb.OpError{URL: b.url, Op: "load", Err: err}
}
}()
obj := &proto.AnswerObject{Oid: xid.Oid}
var data sql.RawBytes
// XXX pid = getReadablePartition (= oid % Np, raise if pid not readable)
err := b.query1(ctx,
// XXX pid = getReadablePartition (= oid % Np; error if pid not readable)
err = b.query1(ctx,
"SELECT tid, compression, data.hash, value, value_tid" +
" FROM obj LEFT JOIN data ON obj.data_id = data.id" +
" WHERE partition=? AND oid=? AND tid<=?" +
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment