Commit 5f5d6516 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent dae00a2b
...@@ -34,7 +34,8 @@ class HandlerSwitcher(object): ...@@ -34,7 +34,8 @@ class HandlerSwitcher(object):
_next_timeout = None _next_timeout = None
_next_timeout_msg_id = None _next_timeout_msg_id = None
_next_on_timeout = None _next_on_timeout = None
_pending = ({}, None), _pending = ({}, None), # ( {msgid -> (answer_klass, timeout, on_timeout, kw)},
# handler )
def __init__(self, handler): def __init__(self, handler):
# pending handlers and related requests # pending handlers and related requests
......
...@@ -147,7 +147,7 @@ class ThreadedApplication(BaseApplication): ...@@ -147,7 +147,7 @@ class ThreadedApplication(BaseApplication):
# check fake packet # check fake packet
if qpacket is None: if qpacket is None:
raise ConnectionClosed raise ConnectionClosed
if msg_id == qpacket.getId(): if msg_id == qpacket.getId(): # NOTE selector on msg_id
if is_forgotten: if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \ raise ValueError, 'ForgottenPacket for an ' \
'explicitly expected packet.' 'explicitly expected packet.'
......
...@@ -530,7 +530,7 @@ class Application(BaseApplication): ...@@ -530,7 +530,7 @@ class Application(BaseApplication):
c = client_node.getConnection() c = client_node.getConnection()
if client_node is transaction_node: if client_node is transaction_node:
c.answer(Packets.AnswerTransactionFinished(ttid, tid), c.answer(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) msg_id=txn.getMessageId()) # NOTE msgid: out-of-order answer
else: else:
# NOTE notifies clients sequentially & irregardless of whether client was subscribed # NOTE notifies clients sequentially & irregardless of whether client was subscribed
c.notify(invalidate_objects) c.notify(invalidate_objects)
......
...@@ -97,5 +97,5 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -97,5 +97,5 @@ class StorageServiceHandler(BaseServiceHandler):
if not uid_set: if not uid_set:
app.packing = None app.packing = None
if not client.isClosed(): if not client.isClosed():
client.answer(Packets.AnswerPack(True), msg_id=msg_id) client.answer(Packets.AnswerPack(True), msg_id=msg_id) # NOTE msg_id: out-of-order answer
...@@ -154,7 +154,7 @@ class StorageOperationHandler(EventHandler): ...@@ -154,7 +154,7 @@ class StorageOperationHandler(EventHandler):
def check(): def check():
r = app.dm.checkTIDRange(*args) r = app.dm.checkTIDRange(*args)
try: try:
conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id) conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id) # NOTE msg_id: out-of-order answer
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield yield
...@@ -170,7 +170,7 @@ class StorageOperationHandler(EventHandler): ...@@ -170,7 +170,7 @@ class StorageOperationHandler(EventHandler):
def check(): def check():
r = app.dm.checkSerialRange(*args) r = app.dm.checkSerialRange(*args)
try: try:
conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id) conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id) # NOTE msg_id: out-of-order answer
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield yield
...@@ -211,7 +211,7 @@ class StorageOperationHandler(EventHandler): ...@@ -211,7 +211,7 @@ class StorageOperationHandler(EventHandler):
tid, user, desc, ext, packed, ttid, oid_list)) tid, user, desc, ext, packed, ttid, oid_list))
yield yield
conn.answer(Packets.AnswerFetchTransactions( conn.answer(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) pack_tid, next_tid, peer_tid_set), msg_id) # NOTE msg_id: out-of-order answer
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
...@@ -253,7 +253,7 @@ class StorageOperationHandler(EventHandler): ...@@ -253,7 +253,7 @@ class StorageOperationHandler(EventHandler):
conn.notify(Packets.AddObject(oid, serial, *object[2:])) conn.notify(Packets.AddObject(oid, serial, *object[2:]))
yield yield
conn.answer(Packets.AnswerFetchObjects( conn.answer(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id) # NOTE msg_id: out-of-order answer
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
......
// TODO copyright / license
// TODO text
package neo
import (
"net"
)
// NodeLink is a node-node connection in NEO
// A node-node connection represents bidirectional symmetrical communication
// link in between 2 NEO nodes. The link provides service for packets
// exchange and for multiplexing several higher-level communication channels on
// top of node-node link.
//
// All packets are classified to be of one of the following kind:
// - notify: a packet is sent without expecting any reply
// - ask: a packet is sent and reply is expected
// - answer: a packet replying to previous ask
//
// At any time there can be several Asks packets issued by both nodes.
// For an Ask packet a single Answer reply is expected XXX vs protocol where there is one request and list of replies ?
//
// XXX -> multiple subconnection explicitly closed with ability to chat
// multiple packets without spawning goroutines? And only single answer
// expected implemented that after only ask-send / answer-receive the
// (sub-)connection is explicitly closed ?
//
// XXX it is maybe better to try to avoid multiplexing by hand and let the OS do it?
//
// A reply to particular Ask packet, once received, will be delivered to
// corresponding goroutine which originally issued Ask XXX this can be put into interface
//
// TODO text
//
// TODO goroutine guarantees
//
//
//
// XXX naming (-> Conn ?, PeerLink ?)
type NodeLink struct {
peerLink net.Conn // raw conn to peer
connTab map[uint32]XXX // msgid -> connection associated with msgid
}
// Conn is a sub-connection established over NodeLink
// TODO text
type Conn struct {
rxq chan Pkt // XXX chan &Pkt ?
}
// Send notify packet to peer
func (c *NodeLink) Notify(pkt XXX) error {
// TODO
}
// Send packet and wait for replied answer packet
func (c *NodeLink) Ask(pkt XXX) (answer Pkt, err error) {
// TODO
}
// TODO how handle receive
// -> TODO Answer
// handle incoming packets routing them to either appropriate
// already-established connections or to new serving goroutine
func (c *NodeLink) serveReceive() error {
for {
// receive 1 packet
pkt, err := c.recvPkt()
if err != nil {
panic(err) // XXX err
}
// if we don't yet have connection established for pkt.Id spawn connection-serving goroutine
// XXX connTab locking
// TODO also check for conn ready to handle new packets, e.g. marked with msgid = 0
// (to avoid spawning goroutine each time for new packet)
conn := c.connTab[pkt.Id]
if conn != nil {
go ...
}
// route packet to serving goroutine handler
conn.rxq <- pkt
}
}
// information about (received ?) packet
// XXX place?
type Pkt struct {
PktHead
Body []byte
}
// receive 1 packet from peer
func (c *NodeLink) recvPkt() (pkt Pkt, err error) {
// TODO organize rx buffers management (freelist etc)
// first read to read pkt header and hopefully up to page of data in 1 syscall
rxbuf := make([]byte, 4096)
n, err := io.ReadAtLeast(c.peerLink, rxbuf, PktHeadLen)
if err != nil {
panic(err) // XXX err
}
pkt.Id = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
pkt.Code = binary.BigEndian.Uint16(rxbuf[4:])
pkt.Length = binary.BigEndian.Uint32(rxbuf[6:])
if pkt.Length < PktHeadLen {
panic("TODO pkt.Length < PktHeadLen") // XXX err (length is a whole packet len with header)
}
if pkt.Length > MAX_PACKET_SIZE {
panic("TODO message too big") // XXX err
}
if pkt.Length > uint32(len(rxbuf)) {
// grow rxbuf
rxbuf2 := make([]byte, pkt.Length)
copy(rxbuf2, rxbuf[:n])
rxbuf = rxbuf2
}
// read rest of pkt data, if we need to
_, err = io.ReadFull(c.peerLink, rxbuf[n:pkt.Length])
if err != nil {
panic(err) // XXX err
}
return pkt, nil
}
...@@ -41,36 +41,7 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) { ...@@ -41,36 +41,7 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) {
n, err := conn.Read(rxbuf.Bytes()) n, err := conn.Read(rxbuf.Bytes())
*/ */
// first read to read pkt header and hopefully up to page of data in 1 syscall recvPkt()
rxbuf := make([]byte, 4096)
n, err := io.ReadAtLeast(conn, rxbuf, PktHeadLen)
if err != nil {
panic(err) // XXX err
}
_/*id*/ = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
_/*code*/= binary.BigEndian.Uint16(rxbuf[4:])
length := binary.BigEndian.Uint32(rxbuf[6:])
if length < PktHeadLen {
panic("TODO pkt.length < PktHeadLen") // XXX err (length is a whole packet len with header)
}
if length > MAX_PACKET_SIZE {
panic("TODO message too big") // XXX err
}
if length > uint32(len(rxbuf)) {
// grow rxbuf
rxbuf2 := make([]byte, length)
copy(rxbuf2, rxbuf[:n])
rxbuf = rxbuf2
}
// read rest of pkt data, if we need to
_, err = io.ReadFull(conn, rxbuf[n:length])
if err != nil {
panic(err) // XXX err
}
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment