Commit dae00a2b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 63875e79
...@@ -224,9 +224,11 @@ class NEOLogger(Logger): ...@@ -224,9 +224,11 @@ class NEOLogger(Logger):
peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<', peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<',
uuid_str(r.uuid), ip, port) uuid_str(r.uuid), ip, port)
msg = r.msg msg = r.msg
"""
pktcls = protocol.StaticRegistry[r.code] pktcls = protocol.StaticRegistry[r.code]
print 'PACKET %s %s\t%s\t%s\t%s %s' % (r.created, r._name, r.msg_id, print 'PACKET %s %s\t%s\t%s\t%s %s' % (r.created, r._name, r.msg_id,
pktcls.__name__, peer, r.pkt.decode()) pktcls.__name__, peer, r.pkt.decode())
"""
if msg is not None: if msg is not None:
msg = buffer(msg) msg = buffer(msg)
self._db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)", self._db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)",
...@@ -269,8 +271,8 @@ class NEOLogger(Logger): ...@@ -269,8 +271,8 @@ class NEOLogger(Logger):
self.parent.callHandlers(record) self.parent.callHandlers(record)
def packet(self, connection, packet, outgoing): def packet(self, connection, packet, outgoing):
return #if True or self._db is not None:
if True or self._db is not None: if self._db is not None:
body = packet._body body = packet._body
if self._max_packet and self._max_packet < len(body): if self._max_packet and self._max_packet < len(body):
body = None body = None
......
...@@ -750,7 +750,7 @@ class Recovery(Packet): ...@@ -750,7 +750,7 @@ class Recovery(Packet):
""" """
_answer = PStruct('answer_recovery', _answer = PStruct('answer_recovery',
PPTID('ptid'), PPTID('ptid'),
PTID('backup_tid'), # NOTE PTID('backup_tid'),
PTID('truncate_tid'), PTID('truncate_tid'),
) )
......
...@@ -204,7 +204,7 @@ class ReadBuffer(object): ...@@ -204,7 +204,7 @@ class ReadBuffer(object):
keep, let = last_chunk[:to_read], last_chunk[to_read:] keep, let = last_chunk[:to_read], last_chunk[to_read:]
self.content.appendleft(let) self.content.appendleft(let)
chunk_list[-1] = keep chunk_list[-1] = keep
# join all chunks (one copy) # join all chunks (one copy) // XXX only 1 chunk -> no copy at all
data = ''.join(chunk_list) data = ''.join(chunk_list)
assert len(data) == size assert len(data) == size
return data return data
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect, random, signal, sys import inspect, random, signal, sys
from logging import getLogger, INFO from logging import getLogger, INFO, DEBUG
from optparse import OptionParser from optparse import OptionParser
from neo.lib import logging from neo.lib import logging
from neo.tests import functional from neo.tests import functional
...@@ -46,7 +46,7 @@ def main(): ...@@ -46,7 +46,7 @@ def main():
options, args = parser.parse_args() options, args = parser.parse_args()
if options.seed: if options.seed:
functional.random = random.Random(options.seed) functional.random = random.Random(options.seed)
getLogger().setLevel(INFO) getLogger().setLevel(DEBUG)
cluster = functional.NEOCluster(args, **{x: getattr(options, x) cluster = functional.NEOCluster(args, **{x: getattr(options, x)
for x, _ in option_list}) for x, _ in option_list})
try: try:
......
...@@ -270,6 +270,56 @@ NodeInformation ...@@ -270,6 +270,56 @@ NodeInformation
TODO TODO
Storage communications
----------------------
* bootstrap (connect to primary master):
>M RequestIdentification
<M AcceptIdentification (AnswerRequestIdentification)
* init (under master command go through recovery / verification)
(BaseMasterHandler)
<M AskRecovery
>M AnswerRecovery (ptid, backup_tid, truncate_tid)
<M AskPartitionTable
>M AnswerPartitionTable (ptid, []PtRow)
# neoctl start
<M NotifyNodeInformation (S1.state=RUNNING)
# S: "I was told I'm RUNNING"
<M NotifyClusterInformation (state=VERIFYING)
<M NotifyNodeInformation (S1.state=RUNNING) # again, why?
# S: "I was told I'm RUNNING"
<M NotifyPartitionTable (ptid=1, `node 0: S1, R`)
# S saves PT info locally
# M asks about unfinished transactions
<M AskLockedTransactions
>M AnswerLockedTransactions {} ttid -> tid # in example we have empty
<M AskLastIDs
>M AnswerLastIDs (last_oid, last_tid)
<M NotifyClusterInformation (state=RUNNING)
<M StartOperation
>M NotifyReady
--- also in base/init handlers:
<M StopOperation
<M AskFinalTID
# >M answerUnfinishedTransactions (only upon S request)
# <M ReelectPrimary ?
<M Truncate
<M Validatetransactions
* operation (serve clients):
Tables Tables
......
...@@ -55,6 +55,9 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) { ...@@ -55,6 +55,9 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) {
if length < PktHeadLen { if length < PktHeadLen {
panic("TODO pkt.length < PktHeadLen") // XXX err (length is a whole packet len with header) panic("TODO pkt.length < PktHeadLen") // XXX err (length is a whole packet len with header)
} }
if length > MAX_PACKET_SIZE {
panic("TODO message too big") // XXX err
}
if length > uint32(len(rxbuf)) { if length > uint32(len(rxbuf)) {
// grow rxbuf // grow rxbuf
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment