Commit f11bafed authored by Grégory Wisniewski's avatar Grégory Wisniewski

First modification of packet management. Encoders are no more instance methods

but module methods that return Packet instances. Next operation is to set msg_id
related to connection only. Updated tests follow in next commit.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@481 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 58edc07e
...@@ -23,6 +23,7 @@ from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ ...@@ -23,6 +23,7 @@ from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
ADMIN_NODE_TYPE ADMIN_NODE_TYPE
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo import protocol
from neo.protocol import Packet from neo.protocol import Packet
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.exception import PrimaryFailure from neo.exception import PrimaryFailure
...@@ -56,10 +57,8 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -56,10 +57,8 @@ class MonitoringEventHandler(BaseEventHandler):
# Should not happen. # Should not happen.
raise RuntimeError('connection completed while not trying to connect') raise RuntimeError('connection completed while not trying to connect')
p = Packet() p = protocol.requestNodeIdentification(conn.getNextId(), ADMIN_NODE_TYPE,
msg_id = conn.getNextId() app.uuid, app.server[0], app.server[1], app.name)
p.requestNodeIdentification(msg_id, ADMIN_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
...@@ -173,8 +172,8 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -173,8 +172,8 @@ class MonitoringEventHandler(BaseEventHandler):
app.uuid = your_uuid app.uuid = your_uuid
# Ask a primary master. # Ask a primary master.
msg_id = conn.getNextId() p = protocol.askPrimaryMaster(conn.getNextId())
conn.addPacket(Packet().askPrimaryMaster(msg_id)) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
......
...@@ -27,6 +27,7 @@ from time import sleep ...@@ -27,6 +27,7 @@ from time import sleep
from neo.client.mq import MQ from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo import protocol
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ INVALID_PTID, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
...@@ -85,10 +86,8 @@ class ConnectionPool(object): ...@@ -85,10 +86,8 @@ class ConnectionPool(object):
return None return None
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, app.uuid, addr[0], addr[1], app.name)
app.uuid, addr[0],
addr[1], app.name)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
app.dispatcher.register(conn, msg_id, app.getQueue()) app.dispatcher.register(conn, msg_id, app.getQueue())
...@@ -305,8 +304,7 @@ class Application(object): ...@@ -305,8 +304,7 @@ class Application(object):
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askNewOIDs(msg_id, 25)
p.askNewOIDs(msg_id, 25)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -365,8 +363,7 @@ class Application(object): ...@@ -365,8 +363,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askObject(msg_id, oid, serial, tid)
p.askObject(msg_id, oid, serial, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -471,8 +468,7 @@ class Application(object): ...@@ -471,8 +468,7 @@ class Application(object):
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askNewTID(msg_id)
p.askNewTID(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -516,9 +512,8 @@ class Application(object): ...@@ -516,9 +512,8 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askStoreObject(msg_id, oid, serial, 1,
p.askStoreObject(msg_id, oid, serial, 1, checksum, compressed_data, self.local_var.tid)
checksum, compressed_data, self.local_var.tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -568,9 +563,8 @@ class Application(object): ...@@ -568,9 +563,8 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askStoreTransaction(msg_id, self.local_var.tid,
p.askStoreTransaction(msg_id, self.local_var.tid, user, desc, ext, user, desc, ext, oid_list)
oid_list)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -616,7 +610,7 @@ class Application(object): ...@@ -616,7 +610,7 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try: try:
conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.local_var.tid)) conn.addPacket(protocol.abortTransaction(conn.getNextId(), self.local_var.tid))
finally: finally:
conn.unlock() conn.unlock()
...@@ -624,7 +618,7 @@ class Application(object): ...@@ -624,7 +618,7 @@ class Application(object):
conn = self.master_conn conn = self.master_conn
conn.lock() conn.lock()
try: try:
conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.local_var.tid)) conn.addPacket(protocol.abortTransaction(conn.getNextId(), self.local_var.tid))
finally: finally:
conn.unlock() conn.unlock()
...@@ -646,8 +640,7 @@ class Application(object): ...@@ -646,8 +640,7 @@ class Application(object):
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.finishTransaction(msg_id, oid_list, self.local_var.tid)
p.finishTransaction(msg_id, oid_list, self.local_var.tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, additional_timeout = 300) conn.expectMessage(msg_id, additional_timeout = 300)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -693,8 +686,7 @@ class Application(object): ...@@ -693,8 +686,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askTransactionInformation(msg_id, transaction_id)
p.askTransactionInformation(msg_id, transaction_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -770,8 +762,7 @@ class Application(object): ...@@ -770,8 +762,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askTIDs(msg_id, first, last, INVALID_PARTITION)
p.askTIDs(msg_id, first, last, INVALID_PARTITION)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -809,8 +800,7 @@ class Application(object): ...@@ -809,8 +800,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askTransactionInformation(msg_id, tid)
p.askTransactionInformation(msg_id, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -865,8 +855,7 @@ class Application(object): ...@@ -865,8 +855,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askObjectHistory(msg_id, oid, 0, length)
p.askObjectHistory(msg_id, oid, 0, length)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -906,8 +895,7 @@ class Application(object): ...@@ -906,8 +895,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askTransactionInformation(msg_id, serial)
p.askTransactionInformation(msg_id, serial)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
...@@ -989,9 +977,8 @@ class Application(object): ...@@ -989,9 +977,8 @@ class Application(object):
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid, self.uuid, '0.0.0.0', 0, self.name)
'0.0.0.0', 0, self.name)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
......
...@@ -19,6 +19,7 @@ import logging ...@@ -19,6 +19,7 @@ import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo import protocol
from neo.protocol import Packet, \ from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \ INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
...@@ -78,12 +79,10 @@ class BaseClientEventHandler(EventHandler): ...@@ -78,12 +79,10 @@ class BaseClientEventHandler(EventHandler):
if conn is not None: if conn is not None:
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer() ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node_list = [(STORAGE_NODE_TYPE, ip_address, port,
node.getUUID(), state)] node.getUUID(), state)]
p.notifyNodeInformation(msg_id, node_list) p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
finally: finally:
conn.unlock() conn.unlock()
...@@ -151,8 +150,7 @@ class PrimaryBoostrapEventHandler(BaseClientEventHandler): ...@@ -151,8 +150,7 @@ class PrimaryBoostrapEventHandler(BaseClientEventHandler):
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askPrimaryMaster(msg_id)
p.askPrimaryMaster(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, app.getQueue()) self.dispatcher.register(conn, msg_id, app.getQueue())
......
...@@ -20,6 +20,7 @@ from neo.locking import RLock ...@@ -20,6 +20,7 @@ from neo.locking import RLock
import sys import sys
import traceback import traceback
from neo import protocol
from neo.protocol import Packet, ProtocolError from neo.protocol import Packet, ProtocolError
from neo.event import IdleEvent from neo.event import IdleEvent
from neo.connector import ConnectorTryAgainException, ConnectorInProgressException from neo.connector import ConnectorTryAgainException, ConnectorInProgressException
...@@ -286,7 +287,7 @@ class Connection(BaseConnection): ...@@ -286,7 +287,7 @@ class Connection(BaseConnection):
self.write_buf += packet.encode() self.write_buf += packet.encode()
except ProtocolError, m: except ProtocolError, m:
logging.critical('trying to send a too big message') logging.critical('trying to send a too big message')
return self.addPacket(packet.internalError(packet.getId(), m[0])) return self.addPacket(protocol.internalError(packet.getId(), m[0]))
# If this is the first time, enable polling for writing. # If this is the first time, enable polling for writing.
if self.write_buf: if self.write_buf:
......
...@@ -19,6 +19,7 @@ import logging ...@@ -19,6 +19,7 @@ import logging
from select import select from select import select
from time import time from time import time
from neo import protocol
from neo.protocol import Packet from neo.protocol import Packet
from neo.epoll import Epoll from neo.epoll import Epoll
...@@ -65,7 +66,7 @@ class IdleEvent(object): ...@@ -65,7 +66,7 @@ class IdleEvent(object):
logging.info('sending a ping to %s:%d', logging.info('sending a ping to %s:%d',
*(conn.getAddress())) *(conn.getAddress()))
msg_id = conn.getNextId() msg_id = conn.getNextId()
conn.addPacket(Packet().ping(msg_id)) conn.addPacket(protocol.ping(msg_id))
conn.expectMessage(msg_id, 5, 0) conn.expectMessage(msg_id, 5, 0)
else: else:
conn.expectMessage(self._id, self._additional_timeout, 0) conn.expectMessage(self._id, self._additional_timeout, 0)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.protocol import Packet, ProtocolError from neo.protocol import Packet, ProtocolError
from neo.connection import ServerConnection from neo.connection import ServerConnection
...@@ -82,7 +83,7 @@ class EventHandler(object): ...@@ -82,7 +83,7 @@ class EventHandler(object):
logging.info('malformed packet %x from %s:%d: %s', logging.info('malformed packet %x from %s:%d: %s',
packet.getType(), conn.getAddress()[0], packet.getType(), conn.getAddress()[0],
conn.getAddress()[1], error_message) conn.getAddress()[1], error_message)
conn.addPacket(Packet().protocolError(packet.getId(), error_message)) conn.addPacket(protocol.protocolError(packet.getId(), error_message))
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
...@@ -109,7 +110,7 @@ class EventHandler(object): ...@@ -109,7 +110,7 @@ class EventHandler(object):
else: else:
message = 'unexpected packet: ' + message message = 'unexpected packet: ' + message
logging.info('%s', message) logging.info('%s', message)
conn.addPacket(Packet().protocolError(packet.getId(), message)) conn.addPacket(protocol.protocolError(packet.getId(), message))
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
...@@ -133,7 +134,7 @@ class EventHandler(object): ...@@ -133,7 +134,7 @@ class EventHandler(object):
def handlePing(self, conn, packet): def handlePing(self, conn, packet):
logging.info('got a ping packet; am I overloaded?') logging.info('got a ping packet; am I overloaded?')
conn.addPacket(Packet().pong(packet.getId())) conn.addPacket(protocol.pong(packet.getId()))
def handlePong(self, conn, packet): def handlePong(self, conn, packet):
pass pass
......
...@@ -21,6 +21,7 @@ from time import time, gmtime ...@@ -21,6 +21,7 @@ from time import time, gmtime
from struct import pack, unpack from struct import pack, unpack
from neo.config import ConfigurationManager from neo.config import ConfigurationManager
from neo import protocol
from neo.protocol import Packet, \ from neo.protocol import Packet, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \ INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
...@@ -188,7 +189,7 @@ class Application(object): ...@@ -188,7 +189,7 @@ class Application(object):
logging.info('I am the primary, so sending an announcement') logging.info('I am the primary, so sending an announcement')
for conn in em.getConnectionList(): for conn in em.getConnectionList():
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
p = Packet().announcePrimaryMaster(conn.getNextId()) p = protocol.announcePrimaryMaster(conn.getNextId())
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
closed = False closed = False
...@@ -238,7 +239,7 @@ class Application(object): ...@@ -238,7 +239,7 @@ class Application(object):
# Ask all connected nodes to reelect a single primary master. # Ask all connected nodes to reelect a single primary master.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
conn.addPacket(Packet().reelectPrimaryMaster(conn.getNextId())) conn.addPacket(protocol.reelectPrimaryMaster(conn.getNextId()))
conn.abort() conn.abort()
# Wait until the connections are closed. # Wait until the connections are closed.
...@@ -293,16 +294,14 @@ class Application(object): ...@@ -293,16 +294,14 @@ class Application(object):
if c.getUUID() is not None: if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID()) n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
p = Packet()
node_list = [(node_type, ip_address, port, uuid, state)] node_list = [(node_type, ip_address, port, uuid, state)]
p.notifyNodeInformation(c.getNextId(), node_list) p = protocol.notifyNodeInformation(c.getNextId(), node_list)
c.addPacket(p) c.addPacket(p)
elif node.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE): elif node.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
p = Packet()
node_list = [(node_type, ip_address, port, uuid, state)] node_list = [(node_type, ip_address, port, uuid, state)]
p.notifyNodeInformation(c.getNextId(), node_list) p = protocol.notifyNodeInformation(c.getNextId(), node_list)
c.addPacket(p) c.addPacket(p)
elif node.getNodeType() != ADMIN_NODE_TYPE: elif node.getNodeType() != ADMIN_NODE_TYPE:
raise RuntimeError('unknown node type') raise RuntimeError('unknown node type')
...@@ -319,8 +318,7 @@ class Application(object): ...@@ -319,8 +318,7 @@ class Application(object):
start = 0 start = 0
while size: while size:
amt = min(10000, size) amt = min(10000, size)
p = Packet() p = protocol.notifyPartitionChanges(c.getNextId(), ptid,
p.notifyPartitionChanges(c.getNextId(), ptid,
cell_list[start:start+amt]) cell_list[start:start+amt])
c.addPacket(p) c.addPacket(p)
size -= amt size -= amt
...@@ -356,9 +354,8 @@ class Application(object): ...@@ -356,9 +354,8 @@ class Application(object):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE \ if node.getNodeType() == STORAGE_NODE_TYPE \
and node.getState() == RUNNING_STATE: and node.getState() == RUNNING_STATE:
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.askLastIDs(msg_id) p = protocol.askLastIDs(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
...@@ -401,8 +398,7 @@ class Application(object): ...@@ -401,8 +398,7 @@ class Application(object):
while size: while size:
amt = min(1000, size) amt = min(1000, size)
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askPartitionTable(msg_id, range(start, start + amt))
p.askPartitionTable(msg_id, range(start, start + amt))
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
size -= amt size -= amt
...@@ -459,9 +455,8 @@ class Application(object): ...@@ -459,9 +455,8 @@ class Application(object):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid in transaction_uuid_list: if uuid in transaction_uuid_list:
self.asking_uuid_dict[uuid] = False self.asking_uuid_dict[uuid] = False
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.askTransactionInformation(msg_id, tid) p = protocol.askTransactionInformation(msg_id, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
if len(self.asking_uuid_dict) == 0: if len(self.asking_uuid_dict) == 0:
...@@ -493,9 +488,8 @@ class Application(object): ...@@ -493,9 +488,8 @@ class Application(object):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid in object_uuid_list: if uuid in object_uuid_list:
self.asking_uuid_dict[uuid] = False self.asking_uuid_dict[uuid] = False
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.askObjectPresent(msg_id, oid, tid) p = protocol.askObjectPresent(msg_id, oid, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
...@@ -540,14 +534,12 @@ class Application(object): ...@@ -540,14 +534,12 @@ class Application(object):
for offset in xrange(self.num_partitions): for offset in xrange(self.num_partitions):
row_list.append((offset, self.pt.getRow(offset))) row_list.append((offset, self.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p = Packet() p = protocol.sendPartitionTable(conn.getNextId(),
p.sendPartitionTable(conn.getNextId(),
self.lptid, row_list) self.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p = Packet() p = protocol.sendPartitionTable(conn.getNextId(),
p.sendPartitionTable(conn.getNextId(),
self.lptid, row_list) self.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
...@@ -575,9 +567,8 @@ class Application(object): ...@@ -575,9 +567,8 @@ class Application(object):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
self.asking_uuid_dict[uuid] = False self.asking_uuid_dict[uuid] = False
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.askUnfinishedTransactions(msg_id) p = protocol.askUnfinishedTransactions(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
...@@ -600,15 +591,13 @@ class Application(object): ...@@ -600,15 +591,13 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
p = Packet() p = protocol.deleteTransaction(conn.getNextId(), tid)
p.deleteTransaction(conn.getNextId(), tid)
conn.addPacket(p) conn.addPacket(p)
else: else:
for conn in em.getConnectionList(): for conn in em.getConnectionList():
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid in uuid_set: if uuid in uuid_set:
p = Packet() p = protocol.commitTransaction(conn.getNextId(), tid)
p.commitTransaction(conn.getNextId(), tid)
conn.addPacket(p) conn.addPacket(p)
# If possible, send the packets now. # If possible, send the packets now.
...@@ -655,7 +644,7 @@ class Application(object): ...@@ -655,7 +644,7 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
conn.addPacket(Packet().startOperation(conn.getNextId())) conn.addPacket(protocol.startOperation(conn.getNextId()))
# Now everything is passive. # Now everything is passive.
expiration = 10 expiration = 10
...@@ -693,7 +682,7 @@ class Application(object): ...@@ -693,7 +682,7 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE):
conn.addPacket(Packet().stopOperation(conn.getNextId())) conn.addPacket(protocol.stopOperation(conn.getNextId()))
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() == CLIENT_NODE_TYPE:
conn.abort() conn.abort()
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
DOWN_STATE, ADMIN_NODE_TYPE DOWN_STATE, ADMIN_NODE_TYPE
...@@ -39,9 +40,8 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -39,9 +40,8 @@ class ElectionEventHandler(MasterEventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
# Request a node idenfitication. # Request a node idenfitication.
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, app.uuid, p = protocol.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name) app.server[0], app.server[1], app.name)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
...@@ -122,7 +122,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -122,7 +122,7 @@ class ElectionEventHandler(MasterEventHandler):
# Ask a primary master. # Ask a primary master.
msg_id = conn.getNextId() msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id)) conn.addPacket(protocol.askPrimaryMaster(msg_id))
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -183,12 +183,12 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -183,12 +183,12 @@ class ElectionEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(protocol.notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -203,8 +203,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -203,8 +203,7 @@ class ElectionEventHandler(MasterEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -216,8 +215,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -216,8 +215,7 @@ class ElectionEventHandler(MasterEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, app.num_partitions, app.num_replicas,
uuid) uuid)
...@@ -248,8 +246,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -248,8 +246,7 @@ class ElectionEventHandler(MasterEventHandler):
continue continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,) info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info) known_master_list.append(info)
p = Packet() p = protocol.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p) conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE
...@@ -67,12 +68,12 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -67,12 +68,12 @@ class RecoveryEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(protocol.notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -117,7 +118,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -117,7 +118,7 @@ class RecoveryEventHandler(MasterEventHandler):
if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE: if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master # Error. This node uses the same server address as a master
# node. # node.
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid server address')) 'invalid server address'))
conn.abort() conn.abort()
return return
...@@ -130,7 +131,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -130,7 +131,7 @@ class RecoveryEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid server address')) 'invalid server address'))
conn.abort() conn.abort()
return return
...@@ -149,7 +150,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -149,7 +150,7 @@ class RecoveryEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid server address')) 'invalid server address'))
conn.abort() conn.abort()
return return
...@@ -165,8 +166,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -165,8 +166,7 @@ class RecoveryEventHandler(MasterEventHandler):
# If this node is broken, reject it. Otherwise, assume that it is # If this node is broken, reject it. Otherwise, assume that it is
# working again. # working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -177,8 +177,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -177,8 +177,7 @@ class RecoveryEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid) app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p) conn.addPacket(p)
...@@ -196,8 +195,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -196,8 +195,7 @@ class RecoveryEventHandler(MasterEventHandler):
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because # It is not necessary to send known master nodes, because
# I must send all node information immediately. # I must send all node information immediately.
p = Packet() p = protocol.answerPrimaryMaster(packet.getId(), app.uuid, [])
p.answerPrimaryMaster(packet.getId(), app.uuid, [])
conn.addPacket(p) conn.addPacket(p)
# Send the information. # Send the information.
...@@ -211,20 +209,17 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -211,20 +209,17 @@ class RecoveryEventHandler(MasterEventHandler):
n.getUUID() or INVALID_UUID, n.getState())) n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000: if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big. # Ugly, but it is necessary to split a packet, if it is too big.
p = Packet() p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
del node_list[:] del node_list[:]
p = Packet() p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
# If this is a storage node, ask the last IDs. # If this is a storage node, ask the last IDs.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.askLastIDs(msg_id) p = protocol.askLastIDs(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
elif node.getNodeType() == ADMIN_NODE_TYPE and app.lptid != INVALID_PTID: elif node.getNodeType() == ADMIN_NODE_TYPE and app.lptid != INVALID_PTID:
...@@ -232,16 +227,15 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -232,16 +227,15 @@ class RecoveryEventHandler(MasterEventHandler):
logging.info('sending partition table %s to %s' % (dump(app.lptid), logging.info('sending partition table %s to %s' % (dump(app.lptid),
conn.getAddress())) conn.getAddress()))
# Split the packet if too huge. # Split the packet if too huge.
p = Packet()
row_list = [] row_list = []
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset))) row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list) p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list) p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
DOWN_STATE, ADMIN_NODE_TYPE DOWN_STATE, ADMIN_NODE_TYPE
...@@ -62,7 +63,7 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -62,7 +63,7 @@ class SecondaryEventHandler(MasterEventHandler):
app = self.app app = self.app
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -80,8 +81,7 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -80,8 +81,7 @@ class SecondaryEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, app.num_partitions, app.num_replicas,
uuid) uuid)
...@@ -108,8 +108,7 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -108,8 +108,7 @@ class SecondaryEventHandler(MasterEventHandler):
info = n.getServer() + (n.getUUID() or INVALID_UUID,) info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info) known_master_list.append(info)
p = Packet() p = protocol.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p) conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
import logging import logging
from copy import copy from copy import copy
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \ UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
...@@ -152,7 +153,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -152,7 +153,7 @@ class ServiceEventHandler(MasterEventHandler):
app = self.app app = self.app
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -201,8 +202,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -201,8 +202,7 @@ class ServiceEventHandler(MasterEventHandler):
or node_type != MASTER_NODE_TYPE: or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as # Error. This node uses the same server address as
# a master node. # a master node.
p = Packet() p = protocol.protocolError(packet.getId(),
p.protocolError(packet.getId(),
'invalid server address') 'invalid server address')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
...@@ -217,8 +217,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -217,8 +217,7 @@ class ServiceEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
p = Packet() p = protocol.protocolError(packet.getId(),
p.protocolError(packet.getId(),
'invalid server address') 'invalid server address')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
...@@ -247,8 +246,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -247,8 +246,7 @@ class ServiceEventHandler(MasterEventHandler):
# This node has a different server address. # This node has a different server address.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
p = Packet() p = protocol.protocolError(packet.getId(),
p.protocolError(packet.getId(),
'invalid server address') 'invalid server address')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
...@@ -271,8 +269,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -271,8 +269,7 @@ class ServiceEventHandler(MasterEventHandler):
# If this node is broken, reject it. Otherwise, assume that # If this node is broken, reject it. Otherwise, assume that
# it is working again. # it is working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -300,8 +297,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -300,8 +297,7 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.getNextPartitionTableID() ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list) app.broadcastPartitionChanges(ptid, cell_list)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid) app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p) conn.addPacket(p)
...@@ -319,8 +315,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -319,8 +315,7 @@ class ServiceEventHandler(MasterEventHandler):
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because # It is not necessary to send known master nodes, because
# I must send all node information immediately. # I must send all node information immediately.
p = Packet() p = protocol.answerPrimaryMaster(packet.getId(), app.uuid, [])
p.answerPrimaryMaster(packet.getId(), app.uuid, [])
conn.addPacket(p) conn.addPacket(p)
# Send the information. # Send the information.
...@@ -336,12 +331,10 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -336,12 +331,10 @@ class ServiceEventHandler(MasterEventHandler):
n.getUUID() or INVALID_UUID, n.getState())) n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000: if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big. # Ugly, but it is necessary to split a packet, if it is too big.
p = Packet() p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
del node_list[:] del node_list[:]
p = Packet() p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
# If this is a storage node or a client node or an admin node, send the partition table. # If this is a storage node or a client node or an admin node, send the partition table.
...@@ -350,21 +343,20 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -350,21 +343,20 @@ class ServiceEventHandler(MasterEventHandler):
logging.info('sending partition table to %s:%d', logging.info('sending partition table to %s:%d',
*(conn.getAddress())) *(conn.getAddress()))
# Split the packet if too huge. # Split the packet if too huge.
p = Packet()
row_list = [] row_list = []
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset))) row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list) p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list) p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
# If this is a storage node, ask it to start. # If this is a storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
conn.addPacket(Packet().startOperation(conn.getNextId())) conn.addPacket(protocol.startOperation(conn.getNextId()))
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -474,7 +466,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -474,7 +466,7 @@ class ServiceEventHandler(MasterEventHandler):
return return
tid = app.getNextTID() tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn) app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.addPacket(Packet().answerNewTID(packet.getId(), tid)) conn.addPacket(protocol.answerNewTID(packet.getId(), tid))
def handleAskNewOIDs(self, conn, packet, num_oids): def handleAskNewOIDs(self, conn, packet, num_oids):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -490,7 +482,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -490,7 +482,7 @@ class ServiceEventHandler(MasterEventHandler):
return return
oid_list = app.getNewOIDList(num_oids) oid_list = app.getNewOIDList(num_oids)
conn.addPacket(Packet().answerNewOIDs(packet.getId(), oid_list)) conn.addPacket(protocol.answerNewOIDs(packet.getId(), oid_list))
def handleFinishTransaction(self, conn, packet, oid_list, tid): def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -527,7 +519,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -527,7 +519,7 @@ class ServiceEventHandler(MasterEventHandler):
for c in app.em.getConnectionList(): for c in app.em.getConnectionList():
if c.getUUID() in uuid_set: if c.getUUID() in uuid_set:
msg_id = c.getNextId() msg_id = c.getNextId()
c.addPacket(Packet().lockInformation(msg_id, tid)) c.addPacket(protocol.lockInformation(msg_id, tid))
c.expectMessage(msg_id, timeout = 60) c.expectMessage(msg_id, timeout = 60)
try: try:
...@@ -569,20 +561,19 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -569,20 +561,19 @@ class ServiceEventHandler(MasterEventHandler):
for c in app.em.getConnectionList(): for c in app.em.getConnectionList():
uuid = c.getUUID() uuid = c.getUUID()
if uuid is not None: if uuid is not None:
p = Packet()
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() == CLIENT_NODE_TYPE:
if c is t.getConnection(): if c is t.getConnection():
p.notifyTransactionFinished(t.getMessageId(), p = protocol.notifyTransactionFinished(
tid) t.getMessageId(), tid)
c.addPacket(p) c.addPacket(p)
else: else:
p.invalidateObjects(c.getNextId(), p = protocol.invalidateObjects(c.getNextId(),
t.getOIDList(), tid) t.getOIDList(), tid)
c.addPacket(p) c.addPacket(p)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
if uuid in t.getUUIDSet(): if uuid in t.getUUIDSet():
p.unlockInformation(c.getNextId(), tid) p = protocol.unlockInformation(c.getNextId(), tid)
c.addPacket(p) c.addPacket(p)
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
except KeyError: except KeyError:
...@@ -615,8 +606,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -615,8 +606,7 @@ class ServiceEventHandler(MasterEventHandler):
return return
app = self.app app = self.app
p = Packet() p = protocol.answerLastIDs(packet.getId(), app.loid, app.ltid, app.lptid)
p.answerLastIDs(packet.getId(), app.loid, app.ltid, app.lptid)
conn.addPacket(p) conn.addPacket(p)
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
...@@ -626,8 +616,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -626,8 +616,7 @@ class ServiceEventHandler(MasterEventHandler):
return return
app = self.app app = self.app
p = Packet() p = protocol.answerUnfinishedTransactions(packet.getId(),
p.answerUnfinishedTransactions(packet.getId(),
app.finishing_transaction_dict.keys()) app.finishing_transaction_dict.keys())
conn.addPacket(p) conn.addPacket(p)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
ADMIN_NODE_TYPE ADMIN_NODE_TYPE
...@@ -91,12 +92,12 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -91,12 +92,12 @@ class VerificationEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(protocol.notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -141,7 +142,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -141,7 +142,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE: if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master # Error. This node uses the same server address as a master
# node. # node.
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid server address')) 'invalid server address'))
conn.abort() conn.abort()
return return
...@@ -154,7 +155,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -154,7 +155,7 @@ class VerificationEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid server address')) 'invalid server address'))
conn.abort() conn.abort()
return return
...@@ -172,7 +173,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -172,7 +173,7 @@ class VerificationEventHandler(MasterEventHandler):
# This node has a different server address. # This node has a different server address.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid server address')) 'invalid server address'))
conn.abort() conn.abort()
return return
...@@ -188,8 +189,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -188,8 +189,7 @@ class VerificationEventHandler(MasterEventHandler):
# If this node is broken, reject it. Otherwise, assume that it is # If this node is broken, reject it. Otherwise, assume that it is
# working again. # working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -200,8 +200,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -200,8 +200,7 @@ class VerificationEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid) app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p) conn.addPacket(p)
...@@ -219,8 +218,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -219,8 +218,7 @@ class VerificationEventHandler(MasterEventHandler):
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because # It is not necessary to send known master nodes, because
# I must send all node information immediately. # I must send all node information immediately.
p = Packet() p = protocol.answerPrimaryMaster(packet.getId(), app.uuid, [])
p.answerPrimaryMaster(packet.getId(), app.uuid, [])
conn.addPacket(p) conn.addPacket(p)
# Send the information. # Send the information.
...@@ -234,28 +232,25 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -234,28 +232,25 @@ class VerificationEventHandler(MasterEventHandler):
n.getUUID() or INVALID_UUID, n.getState())) n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000: if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big. # Ugly, but it is necessary to split a packet, if it is too big.
p = Packet() p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
del node_list[:] del node_list[:]
p = Packet() p = protocol.notifyNodeInformation(conn.getNextId(), node_list)
p.notifyNodeInformation(conn.getNextId(), node_list)
conn.addPacket(p) conn.addPacket(p)
# If this is a storage node or an admin node, send the partition table. # If this is a storage node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
# Split the packet if too huge. # Split the packet if too huge.
p = Packet()
row_list = [] row_list = []
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset))) row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list) p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p.sendPartitionTable(conn.getNextId(), app.lptid, row_list) p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -348,7 +348,7 @@ class Packet(object): ...@@ -348,7 +348,7 @@ class Packet(object):
return None return None
return cls(msg_id, msg_type, msg[PACKET_HEADER_SIZE:msg_len]) return cls(msg_id, msg_type, msg[PACKET_HEADER_SIZE:msg_len])
def __init__(self, msg_id = None, msg_type = None, body = None): def __init__(self, msg_id, msg_type, body=''):
self._id = msg_id self._id = msg_id
self._type = msg_type self._type = msg_type
self._body = body self._body = body
...@@ -374,381 +374,6 @@ class Packet(object): ...@@ -374,381 +374,6 @@ class Packet(object):
__str__ = encode __str__ = encode
def error(self, msg_id, error_code, error_message):
self._id = msg_id
self._type = ERROR
self._body = pack('!HL', error_code, len(error_message)) + error_message
return self
def protocolError(self, msg_id, error_message):
return self.error(msg_id, PROTOCOL_ERROR_CODE, 'protocol error: ' + error_message)
def internalError(self, msg_id, error_message):
return self.error(msg_id, INTERNAL_ERROR_CODE, 'internal error: ' + error_message)
def notReady(self, msg_id, error_message):
return self.error(msg_id, NOT_READY_CODE, 'not ready: ' + error_message)
def brokenNodeDisallowedError(self, msg_id, error_message):
return self.error(msg_id, BROKEN_NODE_DISALLOWED_CODE,
'broken node disallowed error: ' + error_message)
def oidNotFound(self, msg_id, error_message):
return self.error(msg_id, OID_NOT_FOUND_CODE,
'oid not found: ' + error_message)
def tidNotFound(self, msg_id, error_message):
return self.error(msg_id, TID_NOT_FOUND_CODE,
'tid not found: ' + error_message)
def ping(self, msg_id):
self._id = msg_id
self._type = PING
self._body = ''
return self
def pong(self, msg_id):
self._id = msg_id
self._type = PONG
self._body = ''
return self
def requestNodeIdentification(self, msg_id, node_type, uuid, ip_address, port, name):
self._id = msg_id
self._type = REQUEST_NODE_IDENTIFICATION
self._body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, inet_aton(ip_address), port, len(name)) + name
return self
def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address,
port, num_partitions, num_replicas, your_uuid):
self._id = msg_id
self._type = ACCEPT_NODE_IDENTIFICATION
self._body = pack('!H16s4sHLL16s', node_type, uuid,
inet_aton(ip_address), port,
num_partitions, num_replicas, your_uuid)
return self
def askPrimaryMaster(self, msg_id):
self._id = msg_id
self._type = ASK_PRIMARY_MASTER
self._body = ''
return self
def answerPrimaryMaster(self, msg_id, primary_uuid, known_master_list):
self._id = msg_id
self._type = ANSWER_PRIMARY_MASTER
body = [primary_uuid, pack('!L', len(known_master_list))]
for master in known_master_list:
body.append(pack('!4sH16s', inet_aton(master[0]), master[1], master[2]))
self._body = ''.join(body)
return self
def announcePrimaryMaster(self, msg_id):
self._id = msg_id
self._type = ANNOUNCE_PRIMARY_MASTER
self._body = ''
return self
def reelectPrimaryMaster(self, msg_id):
self._id = msg_id
self._type = REELECT_PRIMARY_MASTER
self._body = ''
return self
def notifyNodeInformation(self, msg_id, node_list):
self._id = msg_id
self._type = NOTIFY_NODE_INFORMATION
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
self._body = ''.join(body)
return self
def askLastIDs(self, msg_id):
self._id = msg_id
self._type = ASK_LAST_IDS
self._body = ''
return self
def answerLastIDs(self, msg_id, loid, ltid, lptid):
self._id = msg_id
self._type = ANSWER_LAST_IDS
self._body = loid + ltid + lptid
return self
def askPartitionTable(self, msg_id, offset_list):
self._id = msg_id
self._type = ASK_PARTITION_TABLE
body = [pack('!L', len(offset_list))]
for offset in offset_list:
body.append(pack('!L', offset))
self._body = ''.join(body)
return self
def answerPartitionTable(self, msg_id, ptid, row_list):
self._id = msg_id
self._type = ANSWER_PARTITION_TABLE
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
self._body = ''.join(body)
return self
def sendPartitionTable(self, msg_id, ptid, row_list):
self._id = msg_id
self._type = SEND_PARTITION_TABLE
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
self._body = ''.join(body)
return self
def notifyPartitionChanges(self, msg_id, ptid, cell_list):
self._id = msg_id
self._type = NOTIFY_PARTITION_CHANGES
body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list:
body.append(pack('!L16sH', offset, uuid, state))
self._body = ''.join(body)
return self
def startOperation(self, msg_id):
self._id = msg_id
self._type = START_OPERATION
self._body = ''
return self
def stopOperation(self, msg_id):
self._id = msg_id
self._type = STOP_OPERATION
self._body = ''
return self
def askUnfinishedTransactions(self, msg_id):
self._id = msg_id
self._type = ASK_UNFINISHED_TRANSACTIONS
self._body = ''
return self
def answerUnfinishedTransactions(self, msg_id, tid_list):
self._id = msg_id
self._type = ANSWER_UNFINISHED_TRANSACTIONS
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
self._body = ''.join(body)
return self
def askObjectPresent(self, msg_id, oid, tid):
self._id = msg_id
self._type = ASK_OBJECT_PRESENT
self._body = oid + tid
return self
def answerObjectPresent(self, msg_id, oid, tid):
self._id = msg_id
self._type = ANSWER_OBJECT_PRESENT
self._body = oid + tid
return self
def deleteTransaction(self, msg_id, tid):
self._id = msg_id
self._type = DELETE_TRANSACTION
self._body = tid
return self
def commitTransaction(self, msg_id, tid):
self._id = msg_id
self._type = COMMIT_TRANSACTION
self._body = tid
return self
def askNewTID(self, msg_id):
self._id = msg_id
self._type = ASK_NEW_TID
self._body = ''
return self
def answerNewTID(self, msg_id, tid):
self._id = msg_id
self._type = ANSWER_NEW_TID
self._body = tid
return self
def askNewOIDs(self, msg_id, num_oids):
self._id = msg_id
self._type = ASK_NEW_OIDS
self._body = pack('!H', num_oids)
return self
def answerNewOIDs(self, msg_id, oid_list):
self._id = msg_id
self._type = ANSWER_NEW_OIDS
body = [pack('!H', len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
def finishTransaction(self, msg_id, oid_list, tid):
self._id = msg_id
self._type = FINISH_TRANSACTION
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
def notifyTransactionFinished(self, msg_id, tid):
self._id = msg_id
self._type = NOTIFY_TRANSACTION_FINISHED
self._body = tid
return self
def lockInformation(self, msg_id, tid):
self._id = msg_id
self._type = LOCK_INFORMATION
self._body = tid
return self
def notifyInformationLocked(self, msg_id, tid):
self._id = msg_id
self._type = NOTIFY_INFORMATION_LOCKED
self._body = tid
return self
def invalidateObjects(self, msg_id, oid_list, tid):
self._id = msg_id
self._type = INVALIDATE_OBJECTS
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
def unlockInformation(self, msg_id, tid):
self._id = msg_id
self._type = UNLOCK_INFORMATION
self._body = tid
return self
def abortTransaction(self, msg_id, tid):
self._id = msg_id
self._type = ABORT_TRANSACTION
self._body = tid
return self
def askStoreTransaction(self, msg_id, tid, user, desc, ext, oid_list):
self._id = msg_id
self._type = ASK_STORE_TRANSACTION
user_len = len(user)
desc_len = len(desc)
ext_len = len(ext)
body = [pack('!8sLHHH', tid, len(oid_list), user_len, desc_len, ext_len)]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
self._body = ''.join(body)
return self
def answerStoreTransaction(self, msg_id, tid):
self._id = msg_id
self._type = ANSWER_STORE_TRANSACTION
self._body = tid
return self
def askStoreObject(self, msg_id, oid, serial, compression, checksum, data, tid):
self._id = msg_id
self._type = ASK_STORE_OBJECT
self._body = pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data
return self
def answerStoreObject(self, msg_id, conflicting, oid, serial):
self._id = msg_id
self._type = ANSWER_STORE_OBJECT
self._body = pack('!B8s8s', conflicting, oid, serial)
return self
def askObject(self, msg_id, oid, serial, tid):
self._id = msg_id
self._type = ASK_OBJECT
self._body = pack('!8s8s8s', oid, serial, tid)
return self
def answerObject(self, msg_id, oid, serial_start, serial_end, compression,
checksum, data):
self._id = msg_id
self._type = ANSWER_OBJECT
self._body = pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data
return self
def askTIDs(self, msg_id, first, last, partition):
self._id = msg_id
self._type = ASK_TIDS
self._body = pack('!QQL', first, last, partition)
return self
def answerTIDs(self, msg_id, tid_list):
self._id = msg_id
self._type = ANSWER_TIDS
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
self._body = ''.join(body)
return self
def askTransactionInformation(self, msg_id, tid):
self._id = msg_id
self._type = ASK_TRANSACTION_INFORMATION
self._body = pack('!8s', tid)
return self
def answerTransactionInformation(self, msg_id, tid, user, desc, ext,
oid_list):
self._id = msg_id
self._type = ANSWER_TRANSACTION_INFORMATION
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext),
len(oid_list))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
self._body = ''.join(body)
return self
def askObjectHistory(self, msg_id, oid, first, last):
self._id = msg_id
self._type = ASK_OBJECT_HISTORY
self._body = pack('!8sQQ', oid, first, last)
return self
def answerObjectHistory(self, msg_id, oid, history_list):
self._id = msg_id
self._type = ANSWER_OBJECT_HISTORY
body = [pack('!8sL', oid, len(history_list))]
# history_list is a list of tuple (serial, size)
for history_tuple in history_list:
body.append(pack('!8sL', history_tuple[0], history_tuple[1]))
self._body = ''.join(body)
return self
def askOIDs(self, msg_id, first, last, partition):
self._id = msg_id
self._type = ASK_OIDS
self._body = pack('!QQL', first, last, partition)
return self
def answerOIDs(self, msg_id, oid_list):
self._id = msg_id
self._type = ANSWER_OIDS
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
# Decoders. # Decoders.
def decode(self): def decode(self):
...@@ -1244,3 +869,250 @@ class Packet(object): ...@@ -1244,3 +869,250 @@ class Packet(object):
return (oid_list,) return (oid_list,)
decode_table[ANSWER_OIDS] = _decodeAnswerOIDs decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
# Packet constructors
def _error(msg_id, error_code, error_message):
body = pack('!HL', error_code, len(error_message)) + error_message
return Packet(msg_id, ERROR, body)
def protocolError(msg_id, error_message):
return _error(msg_id, PROTOCOL_ERROR_CODE, 'protocol error: ' + error_message)
def internalError(msg_id, error_message):
return _error(msg_id, INTERNAL_ERROR_CODE, 'internal error: ' + error_message)
def notReady(msg_id, error_message):
return _error(msg_id, NOT_READY_CODE, 'not ready: ' + error_message)
def brokenNodeDisallowedError(msg_id, error_message):
return _error(msg_id, BROKEN_NODE_DISALLOWED_CODE,
'broken node disallowed error: ' + error_message)
def oidNotFound(msg_id, error_message):
return _error(msg_id, OID_NOT_FOUND_CODE, 'oid not found: ' + error_message)
def tidNotFound(msg_id, error_message):
return _error(msg_id, TID_NOT_FOUND_CODE, 'tid not found: ' + error_message)
def ping(msg_id):
return Packet(msg_id, PING)
def pong(msg_id):
return Packet(msg_id, PONG)
def requestNodeIdentification(msg_id, node_type, uuid, ip_address, port, name):
body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, inet_aton(ip_address), port, len(name)) + name
return Packet(msg_id, REQUEST_NODE_IDENTIFICATION, body)
def acceptNodeIdentification(msg_id, node_type, uuid, ip_address,
port, num_partitions, num_replicas, your_uuid):
body = pack('!H16s4sHLL16s', node_type, uuid,
inet_aton(ip_address), port,
num_partitions, num_replicas, your_uuid)
return Packet(msg_id, ACCEPT_NODE_IDENTIFICATION, body)
def askPrimaryMaster(msg_id):
return Packet(msg_id, ASK_PRIMARY_MASTER)
def answerPrimaryMaster(msg_id, primary_uuid, known_master_list):
body = [primary_uuid, pack('!L', len(known_master_list))]
for master in known_master_list:
body.append(pack('!4sH16s', inet_aton(master[0]), master[1], master[2]))
body = ''.join(body)
return Packet(msg_id, ANSWER_PRIMARY_MASTER, body)
def announcePrimaryMaster(msg_id):
return Packet(msg_id, ANNOUNCE_PRIMARY_MASTER)
def reelectPrimaryMaster(msg_id):
return Packet(msg_id, REELECT_PRIMARY_MASTER)
def notifyNodeInformation(msg_id, node_list):
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
body = ''.join(body)
return Packet(msg_id, NOTIFY_NODE_INFORMATION, body)
def askLastIDs(msg_id):
return Packet(msg_id, ASK_LAST_IDS)
def answerLastIDs(msg_id, loid, ltid, lptid):
return Packet(msg_id, ANSWER_LAST_IDS, loid + ltid + lptid)
def askPartitionTable(msg_id, offset_list):
body = [pack('!L', len(offset_list))]
for offset in offset_list:
body.append(pack('!L', offset))
body = ''.join(body)
return Packet(msg_id, ASK_PARTITION_TABLE, body)
def answerPartitionTable(msg_id, ptid, row_list):
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(msg_id, ANSWER_PARTITION_TABLE, body)
def sendPartitionTable(msg_id, ptid, row_list):
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(msg_id, SEND_PARTITION_TABLE, body)
def notifyPartitionChanges(msg_id, ptid, cell_list):
body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list:
body.append(pack('!L16sH', offset, uuid, state))
body = ''.join(body)
return Packet(msg_id, NOTIFY_PARTITION_CHANGES, body)
def startOperation(msg_id):
return Packet(msg_id, START_OPERATION)
def stopOperation(msg_id):
return Packet(msg_id, STOP_OPERATION)
def askUnfinishedTransactions(msg_id):
return Packet(msg_id, ASK_UNFINISHED_TRANSACTIONS)
def answerUnfinishedTransactions(msg_id, tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
body = ''.join(body)
return Packet(msg_id, ANSWER_UNFINISHED_TRANSACTIONS, body)
def askObjectPresent(msg_id, oid, tid):
return Packet(msg_id, ASK_OBJECT_PRESENT, oid + tid)
def answerObjectPresent(msg_id, oid, tid):
return Packet(msg_id, ANSWER_OBJECT_PRESENT, oid + tid)
def deleteTransaction(msg_id, tid):
return Packet(msg_id, DELETE_TRANSACTION, tid)
def commitTransaction(msg_id, tid):
return Packet(msg_id, COMMIT_TRANSACTION, tid)
def askNewTID(msg_id):
return Packet(msg_id, ASK_NEW_TID)
def answerNewTID(msg_id, tid):
return Packet(msg_id, ANSWER_NEW_TID, tid)
def askNewOIDs(msg_id, num_oids):
return Packet(msg_id, ASK_NEW_OIDS, pack('!H', num_oids))
def answerNewOIDs(msg_id, oid_list):
body = [pack('!H', len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(msg_id, ANSWER_NEW_OIDS, body)
def finishTransaction(msg_id, oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(msg_id, FINISH_TRANSACTION, body)
def notifyTransactionFinished(msg_id, tid):
return Packet(msg_id, NOTIFY_TRANSACTION_FINISHED, tid)
def lockInformation(msg_id, tid):
return Packet(msg_id, LOCK_INFORMATION, tid)
def notifyInformationLocked(msg_id, tid):
return Packet(msg_id, NOTIFY_INFORMATION_LOCKED, tid)
def invalidateObjects(msg_id, oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(msg_id, INVALIDATE_OBJECTS, body)
def unlockInformation(msg_id, tid):
return Packet(msg_id, UNLOCK_INFORMATION, tid)
def abortTransaction(msg_id, tid):
return Packet(msg_id, ABORT_TRANSACTION, tid)
def askStoreTransaction(msg_id, tid, user, desc, ext, oid_list):
user_len = len(user)
desc_len = len(desc)
ext_len = len(ext)
body = [pack('!8sLHHH', tid, len(oid_list), len(user), len(desc), len(ext))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
body = ''.join(body)
return Packet(msg_id, ASK_STORE_TRANSACTION, body)
def answerStoreTransaction(msg_id, tid):
return Packet(msg_id, ANSWER_STORE_TRANSACTION, tid)
def askStoreObject(msg_id, oid, serial, compression, checksum, data, tid):
body = pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data
return Packet(msg_id, ASK_STORE_OBJECT, body)
def answerStoreObject(msg_id, conflicting, oid, serial):
body = pack('!B8s8s', conflicting, oid, serial)
return Packet(msg_id, ANSWER_STORE_OBJECT, body)
def askObject(msg_id, oid, serial, tid):
return Packet(msg_id, ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
def answerObject(msg_id, oid, serial_start, serial_end, compression,
checksum, data):
body = pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data
return Packet(msg_id, ANSWER_OBJECT, body)
def askTIDs(msg_id, first, last, partition):
return Packet(msg_id, ASK_TIDS, pack('!QQL', first, last, partition))
def answerTIDs(msg_id, tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
body = ''.join(body)
return Packet(msg_id, ANSWER_TIDS, body)
def askTransactionInformation(msg_id, tid):
return Packet(msg_id, ASK_TRANSACTION_INFORMATION, pack('!8s', tid))
def answerTransactionInformation(msg_id, tid, user, desc, ext, oid_list):
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), len(oid_list))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
body = ''.join(body)
return Packet(msg_id, ANSWER_TRANSACTION_INFORMATION, body)
def askObjectHistory(msg_id, oid, first, last):
return Packet(msg_id, ASK_OBJECT_HISTORY, pack('!8sQQ', oid, first, last))
def answerObjectHistory(msg_id, oid, history_list):
body = [pack('!8sL', oid, len(history_list))]
# history_list is a list of tuple (serial, size)
for history_tuple in history_list:
body.append(pack('!8sL', history_tuple[0], history_tuple[1]))
body = ''.join(body)
return Packet(msg_id, ANSWER_OBJECT_HISTORY, body)
def askOIDs(msg_id, first, last, partition):
return Packet(msg_id, ASK_OIDS, pack('!QQL', first, last, partition))
def answerOIDs(msg_id, oid_list):
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(msg_id, ANSWER_OIDS, body)
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
...@@ -36,9 +37,8 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -36,9 +37,8 @@ class BootstrapEventHandler(StorageEventHandler):
# Should not happen. # Should not happen.
raise RuntimeError('connection completed while not trying to connect') raise RuntimeError('connection completed while not trying to connect')
p = Packet()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid, p = protocol.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name) app.server[0], app.server[1], app.name)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
...@@ -115,12 +115,12 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -115,12 +115,12 @@ class BootstrapEventHandler(StorageEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(protocol.notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -134,8 +134,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -134,8 +134,7 @@ class BootstrapEventHandler(StorageEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -144,8 +143,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -144,8 +143,7 @@ class BootstrapEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
0, 0, uuid) 0, 0, uuid)
conn.addPacket(p) conn.addPacket(p)
...@@ -202,7 +200,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -202,7 +200,7 @@ class BootstrapEventHandler(StorageEventHandler):
# Ask a primary master. # Ask a primary master.
msg_id = conn.getNextId() msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id)) conn.addPacket(protocol.askPrimaryMaster(msg_id))
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
......
...@@ -62,8 +62,7 @@ class StorageEventHandler(EventHandler): ...@@ -62,8 +62,7 @@ class StorageEventHandler(EventHandler):
info = n.getServer() + (n.getUUID() or INVALID_UUID,) info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info) known_master_list.append(info)
p = Packet() p = protocol.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p) conn.addPacket(p)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
......
...@@ -77,14 +77,14 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -77,14 +77,14 @@ class MySQLDatabaseManager(DatabaseManager):
"""Query data from a database.""" """Query data from a database."""
conn = self.conn conn = self.conn
try: try:
printable_char_list = [] # printable_char_list = []
for c in query.split('\n', 1)[0][:70]: # for c in query.split('\n', 1)[0][:70]:
if c not in string.printable or c in '\t\x0b\x0c\r': # if c not in string.printable or c in '\t\x0b\x0c\r':
c = '\\x%02x' % ord(c) # c = '\\x%02x' % ord(c)
printable_char_list.append(c) # printable_char_list.append(c)
query_part = ''.join(printable_char_list) # query_part = ''.join(printable_char_list)
#
logging.debug('querying %s...', query_part) # logging.debug('querying %s...', query_part)
conn.query(query) conn.query(query)
r = conn.store_result() r = conn.store_result()
if r is not None: if r is not None:
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, INVALID_SERIAL, INVALID_TID, \ from neo.protocol import INVALID_UUID, INVALID_SERIAL, INVALID_TID, \
INVALID_PARTITION, \ INVALID_PARTITION, \
...@@ -138,7 +139,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -138,7 +139,7 @@ class OperationEventHandler(StorageEventHandler):
app = self.app app = self.app
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -155,7 +156,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -155,7 +156,7 @@ class OperationEventHandler(StorageEventHandler):
# If I do not know such a node, and it is not even a master # If I do not know such a node, and it is not even a master
# node, simply reject it. # node, simply reject it.
logging.error('reject an unknown node %s', dump(uuid)) logging.error('reject an unknown node %s', dump(uuid))
conn.addPacket(Packet().notReady(packet.getId(), conn.addPacket(protocol.notReady(packet.getId(),
'unknown node')) 'unknown node'))
conn.abort() conn.abort()
return return
...@@ -163,8 +164,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -163,8 +164,7 @@ class OperationEventHandler(StorageEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -173,8 +173,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -173,8 +173,7 @@ class OperationEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, app.num_partitions, app.num_replicas,
uuid) uuid)
...@@ -256,11 +255,10 @@ class OperationEventHandler(StorageEventHandler): ...@@ -256,11 +255,10 @@ class OperationEventHandler(StorageEventHandler):
app = self.app app = self.app
t = app.dm.getTransaction(tid) t = app.dm.getTransaction(tid)
p = Packet()
if t is None: if t is None:
p.tidNotFound(packet.getId(), '%s does not exist' % dump(tid)) p = protocol.tidNotFound(packet.getId(), '%s does not exist' % dump(tid))
else: else:
p.answerTransactionInformation(packet.getId(), tid, p = protocol.answerTransactionInformation(packet.getId(), tid,
t[1], t[2], t[3], t[0]) t[1], t[2], t[3], t[0])
conn.addPacket(p) conn.addPacket(p)
...@@ -286,7 +284,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -286,7 +284,7 @@ class OperationEventHandler(StorageEventHandler):
except KeyError: except KeyError:
pass pass
conn.addPacket(Packet().notifyInformationLocked(packet.getId(), tid)) conn.addPacket(protocol.notifyInformationLocked(packet.getId(), tid))
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -324,25 +322,24 @@ class OperationEventHandler(StorageEventHandler): ...@@ -324,25 +322,24 @@ class OperationEventHandler(StorageEventHandler):
if tid == INVALID_TID: if tid == INVALID_TID:
tid = None tid = None
o = app.dm.getObject(oid, serial, tid) o = app.dm.getObject(oid, serial, tid)
p = Packet()
if o is not None: if o is not None:
serial, next_serial, compression, checksum, data = o serial, next_serial, compression, checksum, data = o
if next_serial is None: if next_serial is None:
next_serial = INVALID_SERIAL next_serial = INVALID_SERIAL
logging.debug('oid = %s, serial = %s, next_serial = %s', logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial)) dump(oid), dump(serial), dump(next_serial))
p.answerObject(packet.getId(), oid, serial, next_serial, p = protocol.answerObject(packet.getId(), oid, serial, next_serial,
compression, checksum, data) compression, checksum, data)
else: else:
logging.debug('oid = %s not found', dump(oid)) logging.debug('oid = %s not found', dump(oid))
p.oidNotFound(packet.getId(), '%s does not exist' % dump(oid)) p = protocol.oidNotFound(packet.getId(), '%s does not exist' % dump(oid))
conn.addPacket(p) conn.addPacket(p)
def handleAskTIDs(self, conn, packet, first, last, partition): def handleAskTIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid offsets')) 'invalid offsets'))
return return
...@@ -362,11 +359,11 @@ class OperationEventHandler(StorageEventHandler): ...@@ -362,11 +359,11 @@ class OperationEventHandler(StorageEventHandler):
tid_list = app.dm.getTIDList(first, last - first, tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list) app.num_partitions, partition_list)
conn.addPacket(Packet().answerTIDs(packet.getId(), tid_list)) conn.addPacket(protocol.answerTIDs(packet.getId(), tid_list))
def handleAskObjectHistory(self, conn, packet, oid, first, last): def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last: if first >= last:
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid offsets')) 'invalid offsets'))
return return
...@@ -374,7 +371,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -374,7 +371,7 @@ class OperationEventHandler(StorageEventHandler):
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None: if history_list is None:
history_list = [] history_list = []
conn.addPacket(Packet().answerObjectHistory(packet.getId(), oid, conn.addPacket(protocol.answerObjectHistory(packet.getId(), oid,
history_list)) history_list))
def handleAskStoreTransaction(self, conn, packet, tid, user, desc, def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
...@@ -388,7 +385,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -388,7 +385,7 @@ class OperationEventHandler(StorageEventHandler):
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid)) t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addTransaction(oid_list, user, desc, ext) t.addTransaction(oid_list, user, desc, ext)
conn.addPacket(Packet().answerStoreTransaction(packet.getId(), tid)) conn.addPacket(protocol.answerStoreTransaction(packet.getId(), tid))
def handleAskStoreObject(self, conn, packet, oid, serial, def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
...@@ -409,7 +406,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -409,7 +406,7 @@ class OperationEventHandler(StorageEventHandler):
# If a newer transaction already locks this object, # If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately. # do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid)) logging.info('unresolvable conflict in %s', dump(oid))
conn.addPacket(Packet().answerStoreObject(packet.getId(), 1, conn.addPacket(protocol.answerStoreObject(packet.getId(), 1,
oid, locking_tid)) oid, locking_tid))
return return
...@@ -419,13 +416,13 @@ class OperationEventHandler(StorageEventHandler): ...@@ -419,13 +416,13 @@ class OperationEventHandler(StorageEventHandler):
last_serial = history_list[0][0] last_serial = history_list[0][0]
if last_serial != serial: if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid)) logging.info('resolvable conflict in %s', dump(oid))
conn.addPacket(Packet().answerStoreObject(packet.getId(), 1, conn.addPacket(protocol.answerStoreObject(packet.getId(), 1,
oid, last_serial)) oid, last_serial))
return return
# Now store the object. # Now store the object.
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid)) t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addObject(oid, compression, checksum, data) t.addObject(oid, compression, checksum, data)
conn.addPacket(Packet().answerStoreObject(packet.getId(), 0, conn.addPacket(protocol.answerStoreObject(packet.getId(), 0,
oid, serial)) oid, serial))
app.store_lock_dict[oid] = tid app.store_lock_dict[oid] = tid
...@@ -470,7 +467,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -470,7 +467,7 @@ class OperationEventHandler(StorageEventHandler):
# This method is complicated, because I must return OIDs only # This method is complicated, because I must return OIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid offsets')) 'invalid offsets'))
return return
...@@ -490,4 +487,4 @@ class OperationEventHandler(StorageEventHandler): ...@@ -490,4 +487,4 @@ class OperationEventHandler(StorageEventHandler):
oid_list = app.dm.getOIDList(first, last - first, oid_list = app.dm.getOIDList(first, last - first,
app.num_partitions, partition_list) app.num_partitions, partition_list)
conn.addPacket(Packet().answerOIDs(packet.getId(), oid_list)) conn.addPacket(protocol.answerOIDs(packet.getId(), oid_list))
...@@ -90,8 +90,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -90,8 +90,7 @@ class ReplicationEventHandler(StorageEventHandler):
tid_set = set(tid_list) - set(present_tid_list) tid_set = set(tid_list) - set(present_tid_list)
for tid in tid_set: for tid in tid_set:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askTransactionInformation(msg_id, tid)
p.askTransactionInformation(msg_id, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
...@@ -99,8 +98,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -99,8 +98,7 @@ class ReplicationEventHandler(StorageEventHandler):
app.replicator.tid_offset += 1000 app.replicator.tid_offset += 1000
offset = app.replicator.tid_offset offset = app.replicator.tid_offset
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askTIDs(msg_id, offset, offset + 1000,
p.askTIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
...@@ -108,8 +106,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -108,8 +106,7 @@ class ReplicationEventHandler(StorageEventHandler):
# If no more TID, a replication of transactions is finished. # If no more TID, a replication of transactions is finished.
# So start to replicate objects now. # So start to replicate objects now.
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askOIDs(msg_id, 0, 1000,
p.askOIDs(msg_id, 0, 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
...@@ -133,8 +130,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -133,8 +130,7 @@ class ReplicationEventHandler(StorageEventHandler):
# Pick one up, and ask the history. # Pick one up, and ask the history.
oid = oid_list.pop() oid = oid_list.pop()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askObjectHistory(msg_id, oid, 0, 1000)
p.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
app.replicator.serial_offset = 0 app.replicator.serial_offset = 0
...@@ -156,8 +152,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -156,8 +152,7 @@ class ReplicationEventHandler(StorageEventHandler):
serial_set = set(serial_list) - set(present_serial_list) serial_set = set(serial_list) - set(present_serial_list)
for serial in serial_set: for serial in serial_set:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askObject(msg_id, oid, serial, INVALID_TID)
p.askObject(msg_id, oid, serial, INVALID_TID)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
...@@ -165,8 +160,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -165,8 +160,7 @@ class ReplicationEventHandler(StorageEventHandler):
app.replicator.serial_offset += 1000 app.replicator.serial_offset += 1000
offset = app.replicator.serial_offset offset = app.replicator.serial_offset
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askObjectHistory(msg_id, oid, offset, offset + 1000)
p.askObjectHistory(msg_id, oid, offset, offset + 1000)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
else: else:
...@@ -176,8 +170,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -176,8 +170,7 @@ class ReplicationEventHandler(StorageEventHandler):
# If I have more pending OIDs, pick one up. # If I have more pending OIDs, pick one up.
oid = oid_list.pop() oid = oid_list.pop()
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askObjectHistory(msg_id, oid, 0, 1000)
p.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
app.replicator.serial_offset = 0 app.replicator.serial_offset = 0
...@@ -186,8 +179,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -186,8 +179,7 @@ class ReplicationEventHandler(StorageEventHandler):
app.replicator.oid_offset += 1000 app.replicator.oid_offset += 1000
offset = app.replicator.oid_offset offset = app.replicator.oid_offset
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = protocol.askOIDs(msg_id, offset, offset + 1000,
p.askOIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300) conn.expectMessage(msg_id, timeout = 300)
...@@ -293,7 +285,7 @@ class Replicator(object): ...@@ -293,7 +285,7 @@ class Replicator(object):
def _askCriticalTID(self): def _askCriticalTID(self):
conn = self.primary_master_connection conn = self.primary_master_connection
msg_id = conn.getNextId() msg_id = conn.getNextId()
conn.addPacket(Packet().askLastIDs(msg_id)) conn.addPacket(protocol.askLastIDs(msg_id))
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.critical_tid_dict[msg_id] = self.new_partition_dict.values() self.critical_tid_dict[msg_id] = self.new_partition_dict.values()
self.partition_dict.update(self.new_partition_dict) self.partition_dict.update(self.new_partition_dict)
...@@ -309,7 +301,7 @@ class Replicator(object): ...@@ -309,7 +301,7 @@ class Replicator(object):
def _askUnfinishedTIDs(self): def _askUnfinishedTIDs(self):
conn = self.primary_master_connection conn = self.primary_master_connection
msg_id = conn.getNextId() msg_id = conn.getNextId()
conn.addPacket(Packet().askUnfinishedTransactions(msg_id)) conn.addPacket(protocol.askUnfinishedTransactions(msg_id))
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.waiting_for_unfinished_tids = True self.waiting_for_unfinished_tids = True
...@@ -343,16 +335,14 @@ class Replicator(object): ...@@ -343,16 +335,14 @@ class Replicator(object):
addr = addr, addr = addr,
connector_handler = app.connector_handler) connector_handler = app.connector_handler)
msg_id = self.current_connection.getNextId() msg_id = self.current_connection.getNextId()
p = Packet() p = protocol.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
p.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name) app.server[0], app.server[1], app.name)
self.current_connection.addPacket(p) self.current_connection.addPacket(p)
self.current_connection.expectMessage(msg_id) self.current_connection.expectMessage(msg_id)
self.tid_offset = 0 self.tid_offset = 0
msg_id = self.current_connection.getNextId() msg_id = self.current_connection.getNextId()
p = Packet() p = protocol.askTIDs(msg_id, 0, 1000, self.current_partition.getRID())
p.askTIDs(msg_id, 0, 1000, self.current_partition.getRID())
self.current_connection.addPacket(p) self.current_connection.addPacket(p)
self.current_connection.expectMessage(msg_id, timeout = 300) self.current_connection.expectMessage(msg_id, timeout = 300)
...@@ -364,8 +354,7 @@ class Replicator(object): ...@@ -364,8 +354,7 @@ class Replicator(object):
self.partition_dict.pop(self.current_partition.getRID()) self.partition_dict.pop(self.current_partition.getRID())
# Notify to a primary master node that my cell is now up-to-date. # Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection conn = self.primary_master_connection
p = Packet() p = protocol.notifyPartitionChanges(conn.getNextId(),
p.notifyPartitionChanges(conn.getNextId(),
app.ptid, app.ptid,
[(self.current_partition.getRID(), [(self.current_partition.getRID(),
app.uuid, app.uuid,
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import logging import logging
from neo import protocol
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_OID, INVALID_TID, \ from neo.protocol import INVALID_OID, INVALID_TID, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
...@@ -68,12 +69,12 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -68,12 +69,12 @@ class VerificationEventHandler(StorageEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later')) conn.addPacket(protocol.notReady(packet.getId(), 'retry later'))
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(), conn.addPacket(protocol.protocolError(packet.getId(),
'invalid cluster name')) 'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -87,8 +88,7 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -87,8 +88,7 @@ class VerificationEventHandler(StorageEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -97,8 +97,7 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -97,8 +97,7 @@ class VerificationEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = Packet() p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, app.num_partitions, app.num_replicas,
uuid) uuid)
...@@ -127,10 +126,9 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -127,10 +126,9 @@ class VerificationEventHandler(StorageEventHandler):
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
if not conn.isServerConnection(): if not conn.isServerConnection():
app = self.app app = self.app
p = Packet()
oid = app.dm.getLastOID() or INVALID_OID oid = app.dm.getLastOID() or INVALID_OID
tid = app.dm.getLastTID() or INVALID_TID tid = app.dm.getLastTID() or INVALID_TID
p.answerLastIDs(packet.getId(), oid, tid, app.ptid) p = protocol.answerLastIDs(packet.getId(), oid, tid, app.ptid)
conn.addPacket(p) conn.addPacket(p)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -149,14 +147,12 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -149,14 +147,12 @@ class VerificationEventHandler(StorageEventHandler):
pass pass
row_list.append((offset, row)) row_list.append((offset, row))
except IndexError: except IndexError:
p = Packet() p = protocol.protocolError(packet.getId(),
p.protocolError(packet.getId(),
'invalid partition table offset') 'invalid partition table offset')
conn.addPacket(p) conn.addPacket(p)
return return
p = Packet() p = protocol.answerPartitionTable(packet.getId(), app.ptid, row_list)
p.answerPartitionTable(packet.getId(), app.ptid, row_list)
conn.addPacket(p) conn.addPacket(p)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -240,8 +236,7 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -240,8 +236,7 @@ class VerificationEventHandler(StorageEventHandler):
if not conn.isServerConnection(): if not conn.isServerConnection():
app = self.app app = self.app
tid_list = app.dm.getUnfinishedTIDList() tid_list = app.dm.getUnfinishedTIDList()
p = Packet() p = protocol.answerUnfinishedTransactions(packet.getId(), tid_list)
p.answerUnfinishedTransactions(packet.getId(), tid_list)
conn.addPacket(p) conn.addPacket(p)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -256,22 +251,20 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -256,22 +251,20 @@ class VerificationEventHandler(StorageEventHandler):
else: else:
t = app.dm.getTransaction(tid) t = app.dm.getTransaction(tid)
p = Packet()
if t is None: if t is None:
p.tidNotFound(packet.getId(), '%s does not exist' % dump(tid)) p = protocol.tidNotFound(packet.getId(), '%s does not exist' % dump(tid))
else: else:
p.answerTransactionInformation(packet.getId(), tid, p = protocol.answerTransactionInformation(packet.getId(), tid,
t[1], t[2], t[3], t[0]) t[1], t[2], t[3], t[0])
conn.addPacket(p) conn.addPacket(p)
def handleAskObjectPresent(self, conn, packet, oid, tid): def handleAskObjectPresent(self, conn, packet, oid, tid):
if not conn.isServerConnection(): if not conn.isServerConnection():
app = self.app app = self.app
p = Packet()
if app.dm.objectPresent(oid, tid): if app.dm.objectPresent(oid, tid):
p.answerObjectPresent(packet.getId(), oid, tid) p = protocol.answerObjectPresent(packet.getId(), oid, tid)
else: else:
p.oidNotFound(packet.getId(), p = protocol.oidNotFound(packet.getId(),
'%s:%s do not exist' % (dump(oid), dump(tid))) '%s:%s do not exist' % (dump(oid), dump(tid)))
conn.addPacket(p) conn.addPacket(p)
else: else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment