Commit 363d5cdb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 239567f0
...@@ -321,6 +321,49 @@ C -> Sv(participating for oidv & tidv) + M ...@@ -321,6 +321,49 @@ C -> Sv(participating for oidv & tidv) + M
AbortTransaction(ttid) AbortTransaction(ttid)
Client communication
--------------------
PACKET x #0x0001 RequestIdentification > None (127.0.0.1:26361)
PACKET x #0x0001 AnswerRequestIdentification < None (127.0.0.1:26361)
INFO x connected to a primary master node
INFO x Got a new UUID: C1
PACKET x #0x0000 NotifyNodeInformation < M1 (127.0.0.1:26361)
PACKET x ! C1 | CLIENT | | RUNNING | 2017-08-29 14:55:03.666895
PACKET x ! M1 | MASTER | 127.0.0.1:26361 | RUNNING | None
PACKET x ! S1 | STORAGE | 127.0.0.1:44529 | RUNNING | 2017-08-29 14:50:16.415306
INFO x Initializing from master
PACKET x #0x0003 PartitionTable > M1 (127.0.0.1:26361)
PACKET x #0x0003 AnswerPartitionTable < M1 (127.0.0.1:26361)
DEBUG x partition table loaded (ptid=1)
DEBUG x pt: node 0: S1, R
DEBUG x pt: 0: U
PACKET x #0x0005 LastTransaction > M1 (127.0.0.1:26361)
PACKET x #0x0005 AnswerLastTransaction < M1 (127.0.0.1:26361)
INFO x Connected and ready
PACKET x #0x0007 Ping > M1 (127.0.0.1:26361)
PACKET x #0x0007 AnswerPing < M1 (127.0.0.1:26361)
PACKET x #0x0001 RequestIdentification > S1 (127.0.0.1:44529)
PACKET x #0x0001 AnswerRequestIdentification < S1 (127.0.0.1:44529)
PACKET x #0x0003 GetObject > S1 (127.0.0.1:44529)
...
Py: out-of-order answers
------------------------
- AnswerTransactionFinished master.onTransactionCommitted
- AnswerPack master.StorageServiceHandler.answerPack
- AnswerCheckTIDRange storage.StorageOperationHandler .askCheckTIDRange
- AnswerCheckSerialRange ----//---- .askCheckSerialRange
- Error.Replication_Error ----//---- .askFetchTransactions
- AddTransaction ----//----
- AnswerFetchTransactions ----//----
- AddObject ----//---- .askFetchObjects
- AnswerFetchObjects ----//----
Tables Tables
------ ------
......
...@@ -26,9 +26,12 @@ import ( ...@@ -26,9 +26,12 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/url" "net/url"
"time"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
) )
...@@ -48,7 +51,7 @@ func (c *Client) StorageName() string { ...@@ -48,7 +51,7 @@ func (c *Client) StorageName() string {
// NewClient creates new client node. // NewClient creates new client node.
// it will connect to master @masterAddr and identify with sepcified cluster name // it will connect to master @masterAddr and identify with sepcified cluster name
func NewClient(clusterName, masterAddr string, net xnet.Networker) (*Client, error) { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
cli := &Client{ cli := &Client{
node: neo.NodeCommon{ node: neo.NodeCommon{
MyInfo: neo.NodeInfo{Type: neo.CLIENT, Addr: neo.Address{}}, MyInfo: neo.NodeInfo{Type: neo.CLIENT, Addr: neo.Address{}},
...@@ -61,9 +64,10 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) (*Client, err ...@@ -61,9 +64,10 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) (*Client, err
}, },
} }
// XXX -> talkMaster // spawn background process which performs master talk
cli.node.Dial(context.TODO(), neo.MASTER, masterAddr) go cli.talkMaster(context.TODO()) // XXX ctx = "client(?)"
panic("TODO")
return cli
} }
...@@ -75,6 +79,84 @@ func (c *Client) Close() error { ...@@ -75,6 +79,84 @@ func (c *Client) Close() error {
// return err // return err
} }
// --- connection with master ---
// talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed.
//
// XXX C -> M for commit
//
// XXX always error (dup Storage.talkMaster) ?
func (c *Client) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%v)", c.node.MasterAddr)(&err)
// XXX dup wrt Storage.talkMaster
for {
err := c.talkMaster1(ctx)
log.Error(ctx, err)
// TODO if err = shutdown -> return
// exit on cancel / throttle reconnecting
select {
case <-ctx.Done():
return ctx.Err()
// XXX 1s hardcoded -> move out of here
case <-time.After(1*time.Second):
// ok
}
}
}
func (c *Client) talkMaster1(ctx context.Context) (err error) {
// XXX dup from Server.talkMaster1
// XXX put logging into Dial?
log.Info(ctx, "connecting ...")
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED
log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above)
return err
}
log.Info(ctx, "identification accepted")
Mlink := Mconn.Link()
defer xio.CloseWhenDone(ctx, Mlink)()
// XXX .nodeTab.Reset()
Ask(partiotionTable)
Ask(lastTransaction)
for {
msg, err := Mconn.Recv()
if err != nil {
return err
}
switch msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
case *neo.NotifyPartitionTable:
// TODO M sends whole PT
//case *neo.NotifyPartitionChanges:
// // TODO M sends δPT
case *neo.NotifyNodeInformation:
// TODO
case *neo.NotifyClusterState:
// TODO
}
}
// --- user API calls ---
func (c *Client) LastTid(ctx context.Context) (zodb.Tid, error) { func (c *Client) LastTid(ctx context.Context) (zodb.Tid, error) {
panic("TODO") panic("TODO")
/* /*
......
...@@ -141,12 +141,13 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -141,12 +141,13 @@ func (stor *Storage) Run(ctx context.Context) error {
// --- connect to master and let it direct us --- // --- connect to master and let it direct us ---
// talkMaster connects to master, announces self and receives commands and notifications. // talkMaster connects to master, announces self and receives commands and notifications.
// it tries to persist master link reconnecting as needed // it tries to persist master link reconnecting as needed.
// //
// it always returns an error - either due to cancel or command from master to shutdown // it always returns an error - either due to cancel or command from master to shutdown
func (stor *Storage) talkMaster(ctx context.Context) (err error) { func (stor *Storage) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err) defer task.Runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err)
// XXX dup wrt Client.talkMaster
for { for {
err := stor.talkMaster1(ctx) err := stor.talkMaster1(ctx)
log.Error(ctx, err) log.Error(ctx, err)
...@@ -169,6 +170,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) { ...@@ -169,6 +170,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) {
// it returns error describing why such cycle had to finish // it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones? // XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) { func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX dup in Client.talkMaster1
// XXX put logging into Dial? // XXX put logging into Dial?
log.Info(ctx, "connecting ...") log.Info(ctx, "connecting ...")
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr) Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
...@@ -182,6 +184,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -182,6 +184,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
Mlink := Mconn.Link() Mlink := Mconn.Link()
// close Mlink on return / cancel // close Mlink on return / cancel
// XXX -> defer xio.CloseWhenDone(ctx, Mlink)()
retch := make(chan struct{}) retch := make(chan struct{})
defer func() { defer func() {
err2 := Mlink.Close() err2 := Mlink.Close()
...@@ -344,16 +347,18 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -344,16 +347,18 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
err = Mconn.Send(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid}) err = Mconn.Send(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
case *neo.NotifyPartitionTable: case *neo.NotifyPartitionTable:
// TODO save locally what M told us // TODO M sends us whole PT -> save locally
case *neo.NotifyPartitionChanges:
case *neo.NotifyClusterState: // TODO M sends us δPT -> save locally?
// TODO .clusterState = ... XXX what to do with it?
case *neo.NotifyNodeInformation: case *neo.NotifyNodeInformation:
// XXX check for myUUID and consider it a command (like neo/py) does? // XXX check for myUUID and consider it a command (like neo/py) does?
// TODO update .nodeTab // TODO update .nodeTab
case *neo.NotifyClusterState:
// TODO .clusterState = ... XXX what to do with it?
} }
// XXX move Mconn.Send here and ^^^ only prepare reply? // XXX move Mconn.Send here and ^^^ only prepare reply?
......
...@@ -220,6 +220,7 @@ class Application(ThreadedApplication): ...@@ -220,6 +220,7 @@ class Application(ThreadedApplication):
"Too many connection failures to the primary master") "Too many connection failures to the primary master")
logging.info('Connected to %s', self.primary_master_node) logging.info('Connected to %s', self.primary_master_node)
try: try:
# NOTE
# Request identification and required informations to be # Request identification and required informations to be
# operational. Might raise ConnectionClosed so that the new # operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again. # primary can be looked-up again.
......
...@@ -12,12 +12,15 @@ from logging import getLogger, DEBUG, INFO ...@@ -12,12 +12,15 @@ from logging import getLogger, DEBUG, INFO
from neo.lib import logging from neo.lib import logging
def main(): def main():
getLogger().setLevel(DEBUG) #getLogger().setLevel(DEBUG)
logging.backlog(max_size=None, max_packet=None) # log everything & without bufferring
master = subprocess.check_output("neoctl -a 127.0.0.1:5551 print node |grep MASTER |awk '{print $5}'", shell=True) master = subprocess.check_output("neoctl -a 127.0.0.1:5551 print node |grep MASTER |awk '{print $5}'", shell=True)
print "master:", `master` print "master:", `master`
kw = { kw = {
'master_nodes': master, 'master_nodes': master,
'name': 'neo1', 'name': 'neo1',
'logfile': 'x.log',
} }
stor = Storage(**kw) stor = Storage(**kw)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment