Commit 0d82139d authored by Grégory Wisniewski's avatar Grégory Wisniewski

When building a protocol packet, the ID is now added by the connection

through notify(), ask() and answer() methods instead of addPacket directly.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@483 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 7544eb06
...@@ -57,10 +57,9 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -57,10 +57,9 @@ class MonitoringEventHandler(BaseEventHandler):
# Should not happen. # Should not happen.
raise RuntimeError('connection completed while not trying to connect') raise RuntimeError('connection completed while not trying to connect')
p = protocol.requestNodeIdentification(conn.getNextId(), ADMIN_NODE_TYPE, p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name) app.uuid, app.server[0], app.server[1], app.name)
conn.addPacket(p) conn.ask(p)
conn.expectMessage(msg_id)
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -172,9 +171,7 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -172,9 +171,7 @@ class MonitoringEventHandler(BaseEventHandler):
app.uuid = your_uuid app.uuid = your_uuid
# Ask a primary master. # Ask a primary master.
p = protocol.askPrimaryMaster(conn.getNextId()) conn.ask(protocol.askPrimaryMaster())
conn.addPacket(p)
conn.expectMessage(msg_id)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
......
...@@ -85,11 +85,9 @@ class ConnectionPool(object): ...@@ -85,11 +85,9 @@ class ConnectionPool(object):
logging.error('Connection to storage node %s failed', node) logging.error('Connection to storage node %s failed', node)
return None return None
msg_id = conn.getNextId() p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
p = protocol.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
app.uuid, addr[0], addr[1], app.name) app.uuid, addr[0], addr[1], app.name)
conn.addPacket(p) msg_id = conn.ask(p)
conn.expectMessage(msg_id)
app.dispatcher.register(conn, msg_id, app.getQueue()) app.dispatcher.register(conn, msg_id, app.getQueue())
finally: finally:
conn.unlock() conn.unlock()
...@@ -303,10 +301,7 @@ class Application(object): ...@@ -303,10 +301,7 @@ class Application(object):
conn = self.master_conn conn = self.master_conn
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.ask(protocol.askNewOIDs(25))
p = protocol.askNewOIDs(msg_id, 25)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
finally: finally:
conn.unlock() conn.unlock()
...@@ -362,10 +357,7 @@ class Application(object): ...@@ -362,10 +357,7 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() msg_id = conn.ask(protocol.askObject(oid, serial, tid))
p = protocol.askObject(msg_id, oid, serial, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.asked_object = 0 self.local_var.asked_object = 0
finally: finally:
...@@ -467,10 +459,7 @@ class Application(object): ...@@ -467,10 +459,7 @@ class Application(object):
raise NEOStorageError("Connection to master node failed") raise NEOStorageError("Connection to master node failed")
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.ask(protocol.askNewTID())
p = protocol.askNewTID(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
finally: finally:
conn.unlock() conn.unlock()
...@@ -511,11 +500,9 @@ class Application(object): ...@@ -511,11 +500,9 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askStoreObject(oid, serial, 1,
p = protocol.askStoreObject(msg_id, oid, serial, 1,
checksum, compressed_data, self.local_var.tid) checksum, compressed_data, self.local_var.tid)
conn.addPacket(p) msg_id = conn.ask(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.object_stored = 0 self.local_var.object_stored = 0
finally: finally:
...@@ -562,11 +549,9 @@ class Application(object): ...@@ -562,11 +549,9 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askStoreTransaction(self.local_var.tid,
p = protocol.askStoreTransaction(msg_id, self.local_var.tid,
user, desc, ext, oid_list) user, desc, ext, oid_list)
conn.addPacket(p) msg_id = msg = conn.ask(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_voted = False self.local_var.txn_voted = False
finally: finally:
...@@ -610,7 +595,7 @@ class Application(object): ...@@ -610,7 +595,7 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try: try:
conn.addPacket(protocol.abortTransaction(conn.getNextId(), self.local_var.tid)) conn.notify(protocol.abortTransaction(self.local_var.tid))
finally: finally:
conn.unlock() conn.unlock()
...@@ -618,7 +603,7 @@ class Application(object): ...@@ -618,7 +603,7 @@ class Application(object):
conn = self.master_conn conn = self.master_conn
conn.lock() conn.lock()
try: try:
conn.addPacket(protocol.abortTransaction(conn.getNextId(), self.local_var.tid)) conn.notify(protocol.abortTransaction(self.local_var.tid))
finally: finally:
conn.unlock() conn.unlock()
...@@ -639,10 +624,8 @@ class Application(object): ...@@ -639,10 +624,8 @@ class Application(object):
conn = self.master_conn conn = self.master_conn
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() p = protocol.finishTransaction(oid_list, self.local_var.tid)
p = protocol.finishTransaction(msg_id, oid_list, self.local_var.tid) msg_id = conn.ask(p)
conn.addPacket(p)
conn.expectMessage(msg_id, additional_timeout = 300)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
finally: finally:
conn.unlock() conn.unlock()
...@@ -685,10 +668,8 @@ class Application(object): ...@@ -685,10 +668,8 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askTransactionInformation(transaction_id)
p = protocol.askTransactionInformation(msg_id, transaction_id) msg_id = conn.ask(p)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_info = 0 self.local_var.txn_info = 0
finally: finally:
...@@ -761,10 +742,8 @@ class Application(object): ...@@ -761,10 +742,8 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askTIDs(first, last, INVALID_PARTITION)
p = protocol.askTIDs(msg_id, first, last, INVALID_PARTITION) msg_id = conn.ask(p)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
finally: finally:
conn.unlock() conn.unlock()
...@@ -799,10 +778,8 @@ class Application(object): ...@@ -799,10 +778,8 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askTransactionInformation(tid)
p = protocol.askTransactionInformation(msg_id, tid) msg_id = conn.ask(p)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_info = 0 self.local_var.txn_info = 0
finally: finally:
...@@ -854,10 +831,8 @@ class Application(object): ...@@ -854,10 +831,8 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askObjectHistory(oid, 0, length)
p = protocol.askObjectHistory(msg_id, oid, 0, length) msg_id = conn.ask(p)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.history = None self.local_var.history = None
finally: finally:
...@@ -894,10 +869,8 @@ class Application(object): ...@@ -894,10 +869,8 @@ class Application(object):
continue continue
try: try:
msg_id = conn.getNextId() p = protocol.askTransactionInformation(serial)
p = protocol.askTransactionInformation(msg_id, serial) msg_id = conn.ask(p)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_info = None self.local_var.txn_info = None
finally: finally:
...@@ -976,11 +949,9 @@ class Application(object): ...@@ -976,11 +949,9 @@ class Application(object):
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
p = protocol.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name) self.uuid, '0.0.0.0', 0, self.name)
conn.addPacket(p) msg_id = conn.ask(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue()) self.dispatcher.register(conn, msg_id, self.getQueue())
finally: finally:
conn.unlock() conn.unlock()
......
...@@ -82,8 +82,7 @@ class BaseClientEventHandler(EventHandler): ...@@ -82,8 +82,7 @@ class BaseClientEventHandler(EventHandler):
ip_address, port = node.getServer() ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node_list = [(STORAGE_NODE_TYPE, ip_address, port,
node.getUUID(), state)] node.getUUID(), state)]
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
finally: finally:
conn.unlock() conn.unlock()
...@@ -149,10 +148,7 @@ class PrimaryBoostrapEventHandler(BaseClientEventHandler): ...@@ -149,10 +148,7 @@ class PrimaryBoostrapEventHandler(BaseClientEventHandler):
# Ask a primary master. # Ask a primary master.
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.ask(protocol.askPrimaryMaster())
p = protocol.askPrimaryMaster(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, app.getQueue()) self.dispatcher.register(conn, msg_id, app.getQueue())
finally: finally:
conn.unlock() conn.unlock()
......
...@@ -287,7 +287,7 @@ class Connection(BaseConnection): ...@@ -287,7 +287,7 @@ class Connection(BaseConnection):
self.write_buf += packet.encode() self.write_buf += packet.encode()
except ProtocolError, m: except ProtocolError, m:
logging.critical('trying to send a too big message') logging.critical('trying to send a too big message')
return self.addPacket(protocol.internalError(packet.getId(), m[0])) return self.addPacket(protocol.internalError(m[0]))
# If this is the first time, enable polling for writing. # If this is the first time, enable polling for writing.
if self.write_buf: if self.write_buf:
...@@ -320,6 +320,24 @@ class Connection(BaseConnection): ...@@ -320,6 +320,24 @@ class Connection(BaseConnection):
self.event_dict[msg_id] = event self.event_dict[msg_id] = event
self.em.addIdleEvent(event) self.em.addIdleEvent(event)
def notify(self, packet):
msg_id = self.getNextId()
packet.setId(msg_id)
self.addPacket(packet)
return msg_id
def ask(self, packet, timeout=5, additional_timeout=30):
msg_id = self.getNextId()
packet.setId(msg_id)
self.expectMessage(msg_id)
self.addPacket(packet)
return msg_id
def answer(self, packet, answer_to):
msg_id = answer_to.getId()
packet.setId(msg_id)
self.addPacket(packet)
def isServerConnection(self): def isServerConnection(self):
raise NotImplementedError raise NotImplementedError
......
...@@ -63,11 +63,8 @@ class IdleEvent(object): ...@@ -63,11 +63,8 @@ class IdleEvent(object):
self._additional_timeout -= 5 self._additional_timeout -= 5
conn.expectMessage(self._id, 5, self._additional_timeout) conn.expectMessage(self._id, 5, self._additional_timeout)
# Start a keep-alive packet. # Start a keep-alive packet.
logging.info('sending a ping to %s:%d', logging.info('sending a ping to %s:%d', *(conn.getAddress()))
*(conn.getAddress())) conn.ask(protocol.ping(), 5, 0)
msg_id = conn.getNextId()
conn.addPacket(protocol.ping(msg_id))
conn.expectMessage(msg_id, 5, 0)
else: else:
conn.expectMessage(self._id, self._additional_timeout, 0) conn.expectMessage(self._id, self._additional_timeout, 0)
return True return True
......
...@@ -83,7 +83,7 @@ class EventHandler(object): ...@@ -83,7 +83,7 @@ class EventHandler(object):
logging.info('malformed packet %x from %s:%d: %s', logging.info('malformed packet %x from %s:%d: %s',
packet.getType(), conn.getAddress()[0], packet.getType(), conn.getAddress()[0],
conn.getAddress()[1], error_message) conn.getAddress()[1], error_message)
conn.addPacket(protocol.protocolError(packet.getId(), error_message)) conn.send(protocol.protocolError(error_message))
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
...@@ -110,7 +110,7 @@ class EventHandler(object): ...@@ -110,7 +110,7 @@ class EventHandler(object):
else: else:
message = 'unexpected packet: ' + message message = 'unexpected packet: ' + message
logging.info('%s', message) logging.info('%s', message)
conn.addPacket(protocol.protocolError(packet.getId(), message)) conn.send(protocol.protocolError(message))
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
...@@ -134,7 +134,7 @@ class EventHandler(object): ...@@ -134,7 +134,7 @@ class EventHandler(object):
def handlePing(self, conn, packet): def handlePing(self, conn, packet):
logging.info('got a ping packet; am I overloaded?') logging.info('got a ping packet; am I overloaded?')
conn.addPacket(protocol.pong(packet.getId())) conn.answer(protocol.pong(), packet)
def handlePong(self, conn, packet): def handlePong(self, conn, packet):
pass pass
......
...@@ -189,8 +189,7 @@ class Application(object): ...@@ -189,8 +189,7 @@ class Application(object):
logging.info('I am the primary, so sending an announcement') logging.info('I am the primary, so sending an announcement')
for conn in em.getConnectionList(): for conn in em.getConnectionList():
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
p = protocol.announcePrimaryMaster(conn.getNextId()) conn.notify(protocol.announcePrimaryMaster())
conn.addPacket(p)
conn.abort() conn.abort()
closed = False closed = False
t = time() t = time()
...@@ -239,7 +238,7 @@ class Application(object): ...@@ -239,7 +238,7 @@ class Application(object):
# Ask all connected nodes to reelect a single primary master. # Ask all connected nodes to reelect a single primary master.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
conn.addPacket(protocol.reelectPrimaryMaster(conn.getNextId())) conn.notify(protocol.reelectPrimaryMaster())
conn.abort() conn.abort()
# Wait until the connections are closed. # Wait until the connections are closed.
...@@ -295,14 +294,12 @@ class Application(object): ...@@ -295,14 +294,12 @@ class Application(object):
n = self.nm.getNodeByUUID(c.getUUID()) n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
node_list = [(node_type, ip_address, port, uuid, state)] node_list = [(node_type, ip_address, port, uuid, state)]
p = protocol.notifyNodeInformation(c.getNextId(), node_list) c.notify(protocol.notifyNodeInformation(node_list))
c.addPacket(p)
elif node.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE): elif node.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
node_list = [(node_type, ip_address, port, uuid, state)] node_list = [(node_type, ip_address, port, uuid, state)]
p = protocol.notifyNodeInformation(c.getNextId(), node_list) c.notify(protocol.notifyNodeInformation(node_list))
c.addPacket(p)
elif node.getNodeType() != ADMIN_NODE_TYPE: elif node.getNodeType() != ADMIN_NODE_TYPE:
raise RuntimeError('unknown node type') raise RuntimeError('unknown node type')
...@@ -318,9 +315,9 @@ class Application(object): ...@@ -318,9 +315,9 @@ class Application(object):
start = 0 start = 0
while size: while size:
amt = min(10000, size) amt = min(10000, size)
p = protocol.notifyPartitionChanges(c.getNextId(), ptid, p = protocol.notifyPartitionChanges(ptid,
cell_list[start:start+amt]) cell_list[start:start+amt])
c.addPacket(p) c.notify(p)
size -= amt size -= amt
start += amt start += amt
...@@ -354,10 +351,7 @@ class Application(object): ...@@ -354,10 +351,7 @@ class Application(object):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE \ if node.getNodeType() == STORAGE_NODE_TYPE \
and node.getState() == RUNNING_STATE: and node.getState() == RUNNING_STATE:
msg_id = conn.getNextId() conn.ask(protocol.askLastIDs())
p = protocol.askLastIDs(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for at least one storage node to appear. # Wait for at least one storage node to appear.
while self.target_uuid is None: while self.target_uuid is None:
...@@ -397,10 +391,7 @@ class Application(object): ...@@ -397,10 +391,7 @@ class Application(object):
size = self.num_partitions size = self.num_partitions
while size: while size:
amt = min(1000, size) amt = min(1000, size)
msg_id = conn.getNextId() conn.ask(protocol.askPartitionTable(range(start, start + amt)))
p = protocol.askPartitionTable(msg_id, range(start, start + amt))
conn.addPacket(p)
conn.expectMessage(msg_id)
size -= amt size -= amt
start += amt start += amt
...@@ -455,10 +446,7 @@ class Application(object): ...@@ -455,10 +446,7 @@ class Application(object):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid in transaction_uuid_list: if uuid in transaction_uuid_list:
self.asking_uuid_dict[uuid] = False self.asking_uuid_dict[uuid] = False
msg_id = conn.getNextId() conn.ask(protocol.askTransactionInformation(tid))
p = protocol.askTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
if len(self.asking_uuid_dict) == 0: if len(self.asking_uuid_dict) == 0:
raise VerificationFailure raise VerificationFailure
...@@ -488,10 +476,7 @@ class Application(object): ...@@ -488,10 +476,7 @@ class Application(object):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid in object_uuid_list: if uuid in object_uuid_list:
self.asking_uuid_dict[uuid] = False self.asking_uuid_dict[uuid] = False
msg_id = conn.getNextId() conn.ask(protocol.askObjectPresent(oid, tid))
p = protocol.askObjectPresent(msg_id, oid, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
while 1: while 1:
em.poll(1) em.poll(1)
...@@ -534,14 +519,12 @@ class Application(object): ...@@ -534,14 +519,12 @@ class Application(object):
for offset in xrange(self.num_partitions): for offset in xrange(self.num_partitions):
row_list.append((offset, self.pt.getRow(offset))) row_list.append((offset, self.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p = protocol.sendPartitionTable(conn.getNextId(), p = protocol.sendPartitionTable( self.lptid, row_list)
self.lptid, row_list) conn.notify(p)
conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p = protocol.sendPartitionTable(conn.getNextId(), p = protocol.sendPartitionTable(self.lptid, row_list)
self.lptid, row_list) conn.notify(p)
conn.addPacket(p)
# Gather all unfinished transactions. # Gather all unfinished transactions.
# #
...@@ -567,10 +550,7 @@ class Application(object): ...@@ -567,10 +550,7 @@ class Application(object):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
self.asking_uuid_dict[uuid] = False self.asking_uuid_dict[uuid] = False
msg_id = conn.getNextId() conn.ask(protocol.askUnfinishedTransactions())
p = protocol.askUnfinishedTransactions(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
while 1: while 1:
em.poll(1) em.poll(1)
...@@ -591,14 +571,12 @@ class Application(object): ...@@ -591,14 +571,12 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
p = protocol.deleteTransaction(conn.getNextId(), tid) conn.notify(protocol.deleteTransaction(tid))
conn.addPacket(p)
else: else:
for conn in em.getConnectionList(): for conn in em.getConnectionList():
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid in uuid_set: if uuid in uuid_set:
p = protocol.commitTransaction(conn.getNextId(), tid) conn.ask(protocol.commitTransaction(tid))
conn.addPacket(p)
# If possible, send the packets now. # If possible, send the packets now.
em.poll(0) em.poll(0)
...@@ -644,7 +622,7 @@ class Application(object): ...@@ -644,7 +622,7 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
conn.addPacket(protocol.startOperation(conn.getNextId())) conn.notify(protocol.startOperation())
# Now everything is passive. # Now everything is passive.
expiration = 10 expiration = 10
...@@ -682,7 +660,7 @@ class Application(object): ...@@ -682,7 +660,7 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE):
conn.addPacket(protocol.stopOperation(conn.getNextId())) conn.notify(protocol.stopOperation())
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() == CLIENT_NODE_TYPE:
conn.abort() conn.abort()
......
...@@ -40,11 +40,9 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -40,11 +40,9 @@ class ElectionEventHandler(MasterEventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
# Request a node idenfitication. # Request a node idenfitication.
msg_id = conn.getNextId() p = protocol.requestNodeIdentification(MASTER_NODE_TYPE, app.uuid,
p = protocol.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name) app.server[0], app.server[1], app.name)
conn.addPacket(p) conn.ask(p)
conn.expectMessage(msg_id)
MasterEventHandler.connectionCompleted(self, conn) MasterEventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -121,9 +119,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -121,9 +119,7 @@ class ElectionEventHandler(MasterEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
# Ask a primary master. # Ask a primary master.
msg_id = conn.getNextId() conn.ask(protocol.askPrimaryMaster())
conn.addPacket(protocol.askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -183,13 +179,12 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -183,13 +179,12 @@ class ElectionEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.addPacket(protocol.notReady(packet.getId(), 'retry later')) conn.answer(protocol.notReady('retry later'), packet)
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError('invalid cluster name'), packet)
'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -203,8 +198,8 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -203,8 +198,8 @@ class ElectionEventHandler(MasterEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') conn.answer(protocol.brokenNodeDisallowedError(
conn.addPacket(p) 'go away'), packet)
conn.abort() conn.abort()
return return
...@@ -215,11 +210,11 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -215,11 +210,11 @@ class ElectionEventHandler(MasterEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, app.num_partitions, app.num_replicas,
uuid) uuid)
conn.addPacket(p) conn.answer(p, packet)
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.expectMessage() conn.expectMessage()
...@@ -246,8 +241,8 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -246,8 +241,8 @@ class ElectionEventHandler(MasterEventHandler):
continue continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,) info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info) known_master_list.append(info)
p = protocol.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list) p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.addPacket(p) conn.answer(p, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
if not conn.isServerConnection(): if not conn.isServerConnection():
......
...@@ -68,13 +68,12 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -68,13 +68,12 @@ class RecoveryEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.addPacket(protocol.notReady(packet.getId(), 'retry later')) conn.answer(protocol.notReady('retry later'), packet)
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError('invalid cluster name'), packet)
'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -118,8 +117,8 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -118,8 +117,8 @@ class RecoveryEventHandler(MasterEventHandler):
if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE: if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master # Error. This node uses the same server address as a master
# node. # node.
conn.addPacket(protocol.protocolError(packet.getId(), p = protocol.protocolError('invalid server address')
'invalid server address')) conn.answer(p, packet)
conn.abort() conn.abort()
return return
...@@ -131,8 +130,8 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -131,8 +130,8 @@ class RecoveryEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(protocol.protocolError(packet.getId(), p = protocol.protocolError('invalid server address')
'invalid server address')) conn.answer(p, packet)
conn.abort() conn.abort()
return return
else: else:
...@@ -150,8 +149,8 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -150,8 +149,8 @@ class RecoveryEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(protocol.protocolError(packet.getId(), p = protocol.protocolError('invalid server address')
'invalid server address')) conn.answer(p, packet)
conn.abort() conn.abort()
return return
else: else:
...@@ -166,8 +165,8 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -166,8 +165,8 @@ class RecoveryEventHandler(MasterEventHandler):
# If this node is broken, reject it. Otherwise, assume that it is # If this node is broken, reject it. Otherwise, assume that it is
# working again. # working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') p = protocol.brokenNodeDisallowedError('go away')
conn.addPacket(p) conn.answer(p, packet)
conn.abort() conn.abort()
return return
else: else:
...@@ -177,12 +176,11 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -177,12 +176,11 @@ class RecoveryEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid) app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.expectMessage() conn.answer(p, packet)
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -195,8 +193,8 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -195,8 +193,8 @@ class RecoveryEventHandler(MasterEventHandler):
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because # It is not necessary to send known master nodes, because
# I must send all node information immediately. # I must send all node information immediately.
p = protocol.answerPrimaryMaster(packet.getId(), app.uuid, []) p = protocol.answerPrimaryMaster(app.uuid, [])
conn.addPacket(p) conn.answer(p, packet)
# Send the information. # Send the information.
node_list = [] node_list = []
...@@ -209,19 +207,14 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -209,19 +207,14 @@ class RecoveryEventHandler(MasterEventHandler):
n.getUUID() or INVALID_UUID, n.getState())) n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000: if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big. # Ugly, but it is necessary to split a packet, if it is too big.
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
del node_list[:] del node_list[:]
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
# If this is a storage node, ask the last IDs. # If this is a storage node, ask the last IDs.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
msg_id = conn.getNextId() conn.ask(protocol.askLastIDs())
p = protocol.askLastIDs(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
elif node.getNodeType() == ADMIN_NODE_TYPE and app.lptid != INVALID_PTID: elif node.getNodeType() == ADMIN_NODE_TYPE and app.lptid != INVALID_PTID:
# send partition table if exists # send partition table if exists
logging.info('sending partition table %s to %s' % (dump(app.lptid), logging.info('sending partition table %s to %s' % (dump(app.lptid),
...@@ -231,12 +224,10 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -231,12 +224,10 @@ class RecoveryEventHandler(MasterEventHandler):
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset))) row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list) conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list) conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
......
...@@ -63,8 +63,7 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -63,8 +63,7 @@ class SecondaryEventHandler(MasterEventHandler):
app = self.app app = self.app
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError('invalid cluster name'), packet)
'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -81,11 +80,11 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -81,11 +80,11 @@ class SecondaryEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, app.num_partitions, app.num_replicas,
uuid) uuid)
conn.addPacket(p) conn.answer(p, packet)
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.expectMessage() conn.expectMessage()
...@@ -108,8 +107,8 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -108,8 +107,8 @@ class SecondaryEventHandler(MasterEventHandler):
info = n.getServer() + (n.getUUID() or INVALID_UUID,) info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info) known_master_list.append(info)
p = protocol.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list) p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.addPacket(p) conn.answer(p, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
......
...@@ -153,8 +153,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -153,8 +153,7 @@ class ServiceEventHandler(MasterEventHandler):
app = self.app app = self.app
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.notify(protocol.protocolError('invalid cluster name'))
'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -202,9 +201,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -202,9 +201,7 @@ class ServiceEventHandler(MasterEventHandler):
or node_type != MASTER_NODE_TYPE: or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as # Error. This node uses the same server address as
# a master node. # a master node.
p = protocol.protocolError(packet.getId(), conn.notify(protocol.protocolError( 'invalid server address'))
'invalid server address')
conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -217,9 +214,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -217,9 +214,7 @@ class ServiceEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
p = protocol.protocolError(packet.getId(), conn.notify(protocol.protocolError('invalid server address'))
'invalid server address')
conn.addPacket(p)
conn.abort() conn.abort()
return return
else: else:
...@@ -246,9 +241,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -246,9 +241,7 @@ class ServiceEventHandler(MasterEventHandler):
# This node has a different server address. # This node has a different server address.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
p = protocol.protocolError(packet.getId(), conn.notify(protocol.protocolError('invalid server address'))
'invalid server address')
conn.addPacket(p)
conn.abort() conn.abort()
return return
else: else:
...@@ -269,8 +262,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -269,8 +262,7 @@ class ServiceEventHandler(MasterEventHandler):
# If this node is broken, reject it. Otherwise, assume that # If this node is broken, reject it. Otherwise, assume that
# it is working again. # it is working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') conn.notify(protocol.brokenNodeDisallowedError('go away'))
conn.addPacket(p)
conn.abort() conn.abort()
return return
else: else:
...@@ -297,12 +289,11 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -297,12 +289,11 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.getNextPartitionTableID() ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list) app.broadcastPartitionChanges(ptid, cell_list)
p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid) app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.expectMessage() conn.answer(p, packet)
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -315,8 +306,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -315,8 +306,7 @@ class ServiceEventHandler(MasterEventHandler):
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because # It is not necessary to send known master nodes, because
# I must send all node information immediately. # I must send all node information immediately.
p = protocol.answerPrimaryMaster(packet.getId(), app.uuid, []) conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet)
conn.addPacket(p)
# Send the information. # Send the information.
logging.info('sending notify node information to %s:%d', logging.info('sending notify node information to %s:%d',
...@@ -331,11 +321,9 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -331,11 +321,9 @@ class ServiceEventHandler(MasterEventHandler):
n.getUUID() or INVALID_UUID, n.getState())) n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000: if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big. # Ugly, but it is necessary to split a packet, if it is too big.
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
del node_list[:] del node_list[:]
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
# If this is a storage node or a client node or an admin node, send the partition table. # If this is a storage node or a client node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
...@@ -347,16 +335,14 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -347,16 +335,14 @@ class ServiceEventHandler(MasterEventHandler):
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset))) row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list) conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list) conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
conn.addPacket(p)
# If this is a storage node, ask it to start. # If this is a storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
conn.addPacket(protocol.startOperation(conn.getNextId())) conn.notify(protocol.startOperation())
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -466,7 +452,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -466,7 +452,7 @@ class ServiceEventHandler(MasterEventHandler):
return return
tid = app.getNextTID() tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn) app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.addPacket(protocol.answerNewTID(packet.getId(), tid)) conn.answer(protocol.answerNewTID(tid), packet)
def handleAskNewOIDs(self, conn, packet, num_oids): def handleAskNewOIDs(self, conn, packet, num_oids):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -475,14 +461,12 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -475,14 +461,12 @@ class ServiceEventHandler(MasterEventHandler):
return return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != CLIENT_NODE_TYPE: if node.getNodeType() != CLIENT_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
return return
oid_list = app.getNewOIDList(num_oids) oid_list = app.getNewOIDList(num_oids)
conn.addPacket(protocol.answerNewOIDs(packet.getId(), oid_list)) conn.answer(protocol.answerNewOIDs(oid_list), packet)
def handleFinishTransaction(self, conn, packet, oid_list, tid): def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -518,9 +502,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -518,9 +502,7 @@ class ServiceEventHandler(MasterEventHandler):
# Request locking data. # Request locking data.
for c in app.em.getConnectionList(): for c in app.em.getConnectionList():
if c.getUUID() in uuid_set: if c.getUUID() in uuid_set:
msg_id = c.getNextId() c.ask(protocol.lockInformation(tid), timeout=60)
c.addPacket(protocol.lockInformation(msg_id, tid))
c.expectMessage(msg_id, timeout = 60)
try: try:
t = app.finishing_transaction_dict[tid] t = app.finishing_transaction_dict[tid]
...@@ -564,17 +546,17 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -564,17 +546,17 @@ class ServiceEventHandler(MasterEventHandler):
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == CLIENT_NODE_TYPE: if node.getNodeType() == CLIENT_NODE_TYPE:
if c is t.getConnection(): if c is t.getConnection():
p = protocol.notifyTransactionFinished( # TODO: use connection.notify if possible
t.getMessageId(), tid) p = protocol.notifyTransactionFinished(tid)
p.setId(t.getMessageId())
c.addPacket(p) c.addPacket(p)
else: else:
p = protocol.invalidateObjects(c.getNextId(), p = protocol.invalidateObjects(t.getOIDList(), tid)
t.getOIDList(), tid) c.notify(p)
c.addPacket(p)
elif node.getNodeType() == STORAGE_NODE_TYPE: elif node.getNodeType() == STORAGE_NODE_TYPE:
if uuid in t.getUUIDSet(): if uuid in t.getUUIDSet():
p = protocol.unlockInformation(c.getNextId(), tid) p = protocol.unlockInformation(tid)
c.addPacket(p) c.notify(p)
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
except KeyError: except KeyError:
# What is this? # What is this?
...@@ -606,8 +588,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -606,8 +588,7 @@ class ServiceEventHandler(MasterEventHandler):
return return
app = self.app app = self.app
p = protocol.answerLastIDs(packet.getId(), app.loid, app.ltid, app.lptid) conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.lptid), packet)
conn.addPacket(p)
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -616,9 +597,8 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -616,9 +597,8 @@ class ServiceEventHandler(MasterEventHandler):
return return
app = self.app app = self.app
p = protocol.answerUnfinishedTransactions(packet.getId(), p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
app.finishing_transaction_dict.keys()) conn.answer(p, packet)
conn.addPacket(p)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
# This should be sent when a cell becomes up-to-date because # This should be sent when a cell becomes up-to-date because
......
...@@ -92,13 +92,12 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -92,13 +92,12 @@ class VerificationEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.addPacket(protocol.notReady(packet.getId(), 'retry later')) conn.answer(protocol.notReady('retry later'), packet)
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError('invalid cluster name'), packet)
'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -142,8 +141,8 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -142,8 +141,8 @@ class VerificationEventHandler(MasterEventHandler):
if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE: if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master # Error. This node uses the same server address as a master
# node. # node.
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError(
'invalid server address')) 'invalid server address'), packet)
conn.abort() conn.abort()
return return
...@@ -155,8 +154,8 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -155,8 +154,8 @@ class VerificationEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError(
'invalid server address')) 'invalid server address'), packet)
conn.abort() conn.abort()
return return
else: else:
...@@ -173,8 +172,8 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -173,8 +172,8 @@ class VerificationEventHandler(MasterEventHandler):
# This node has a different server address. # This node has a different server address.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError(
'invalid server address')) 'invalid server address'), packet)
conn.abort() conn.abort()
return return
else: else:
...@@ -189,8 +188,8 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -189,8 +188,8 @@ class VerificationEventHandler(MasterEventHandler):
# If this node is broken, reject it. Otherwise, assume that it is # If this node is broken, reject it. Otherwise, assume that it is
# working again. # working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') p = protocol.brokenNodeDisallowedError('go away')
conn.addPacket(p) conn.answer(p, packet)
conn.abort() conn.abort()
return return
else: else:
...@@ -200,10 +199,10 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -200,10 +199,10 @@ class VerificationEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid) app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p) conn.answer(p, packet)
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.expectMessage() conn.expectMessage()
...@@ -218,8 +217,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -218,8 +217,7 @@ class VerificationEventHandler(MasterEventHandler):
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because # It is not necessary to send known master nodes, because
# I must send all node information immediately. # I must send all node information immediately.
p = protocol.answerPrimaryMaster(packet.getId(), app.uuid, []) conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet)
conn.addPacket(p)
# Send the information. # Send the information.
node_list = [] node_list = []
...@@ -232,11 +230,9 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -232,11 +230,9 @@ class VerificationEventHandler(MasterEventHandler):
n.getUUID() or INVALID_UUID, n.getState())) n.getUUID() or INVALID_UUID, n.getState()))
if len(node_list) == 10000: if len(node_list) == 10000:
# Ugly, but it is necessary to split a packet, if it is too big. # Ugly, but it is necessary to split a packet, if it is too big.
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
del node_list[:] del node_list[:]
p = protocol.notifyNodeInformation(conn.getNextId(), node_list) conn.notify(protocol.notifyNodeInformation(node_list))
conn.addPacket(p)
# If this is a storage node or an admin node, send the partition table. # If this is a storage node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
...@@ -246,12 +242,10 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -246,12 +242,10 @@ class VerificationEventHandler(MasterEventHandler):
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset))) row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000: if len(row_list) == 1000:
p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list) conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p = protocol.sendPartitionTable(conn.getNextId(), app.lptid, row_list) conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
conn.addPacket(p)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
......
This diff is collapsed.
...@@ -37,11 +37,9 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -37,11 +37,9 @@ class BootstrapEventHandler(StorageEventHandler):
# Should not happen. # Should not happen.
raise RuntimeError('connection completed while not trying to connect') raise RuntimeError('connection completed while not trying to connect')
msg_id = conn.getNextId() p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
p = protocol.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name) app.server[0], app.server[1], app.name)
conn.addPacket(p) conn.ask(p)
conn.expectMessage(msg_id)
StorageEventHandler.connectionCompleted(self, conn) StorageEventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -115,13 +113,12 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -115,13 +113,12 @@ class BootstrapEventHandler(StorageEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.addPacket(protocol.notReady(packet.getId(), 'retry later')) conn.answer(protocol.notReady('retry later'), packet)
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError('invalid cluster name'), packet)
'invalid cluster name'))
conn.abort() conn.abort()
return return
...@@ -134,8 +131,8 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -134,8 +131,8 @@ class BootstrapEventHandler(StorageEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') p = protocol.brokenNodeDisallowedError('go away')
conn.addPacket(p) conn.answer(p, packet)
conn.abort() conn.abort()
return return
...@@ -143,10 +140,9 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -143,10 +140,9 @@ class BootstrapEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE, p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.uuid, app.server[0], app.server[1], app.server[0], app.server[1], 0, 0, uuid)
0, 0, uuid) conn.answer(p, packet)
conn.addPacket(p)
# Now the master node should know that I am not the right one. # Now the master node should know that I am not the right one.
conn.abort() conn.abort()
...@@ -199,9 +195,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -199,9 +195,7 @@ class BootstrapEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
# Ask a primary master. # Ask a primary master.
msg_id = conn.getNextId() conn.ask(protocol.askPrimaryMaster())
conn.addPacket(protocol.askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
......
...@@ -139,8 +139,8 @@ class OperationEventHandler(StorageEventHandler): ...@@ -139,8 +139,8 @@ class OperationEventHandler(StorageEventHandler):
app = self.app app = self.app
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), p = protocol.protocolError('invalid cluster name')
'invalid cluster name')) conn.answer(p, packet)
conn.abort() conn.abort()
return return
...@@ -156,16 +156,15 @@ class OperationEventHandler(StorageEventHandler): ...@@ -156,16 +156,15 @@ class OperationEventHandler(StorageEventHandler):
# If I do not know such a node, and it is not even a master # If I do not know such a node, and it is not even a master
# node, simply reject it. # node, simply reject it.
logging.error('reject an unknown node %s', dump(uuid)) logging.error('reject an unknown node %s', dump(uuid))
conn.addPacket(protocol.notReady(packet.getId(), conn.answer(protocol.notReady('unknown node'), packet)
'unknown node'))
conn.abort() conn.abort()
return return
else: else:
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') p = protocol.brokenNodeDisallowedError('go away')
conn.addPacket(p) conn.answer(p, packet)
conn.abort() conn.abort()
return return
...@@ -173,11 +172,10 @@ class OperationEventHandler(StorageEventHandler): ...@@ -173,11 +172,10 @@ class OperationEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE, p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.uuid, app.server[0], app.server[1], app.server[0], app.server[1], app.num_partitions,
app.num_partitions, app.num_replicas, app.num_replicas, uuid)
uuid) conn.answer(p, packet)
conn.addPacket(p)
if node_type == MASTER_NODE_TYPE: if node_type == MASTER_NODE_TYPE:
conn.abort() conn.abort()
...@@ -256,11 +254,10 @@ class OperationEventHandler(StorageEventHandler): ...@@ -256,11 +254,10 @@ class OperationEventHandler(StorageEventHandler):
t = app.dm.getTransaction(tid) t = app.dm.getTransaction(tid)
if t is None: if t is None:
p = protocol.tidNotFound(packet.getId(), '%s does not exist' % dump(tid)) p = protocol.tidNotFound('%s does not exist' % dump(tid))
else: else:
p = protocol.answerTransactionInformation(packet.getId(), tid, p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
t[1], t[2], t[3], t[0]) conn.answer(p, packet)
conn.addPacket(p)
def handleAskObjectPresent(self, conn, packet, oid, tid): def handleAskObjectPresent(self, conn, packet, oid, tid):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -284,7 +281,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -284,7 +281,7 @@ class OperationEventHandler(StorageEventHandler):
except KeyError: except KeyError:
pass pass
conn.addPacket(protocol.notifyInformationLocked(packet.getId(), tid)) conn.answer(protocol.notifyInformationLocked(tid), packet)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -328,19 +325,18 @@ class OperationEventHandler(StorageEventHandler): ...@@ -328,19 +325,18 @@ class OperationEventHandler(StorageEventHandler):
next_serial = INVALID_SERIAL next_serial = INVALID_SERIAL
logging.debug('oid = %s, serial = %s, next_serial = %s', logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial)) dump(oid), dump(serial), dump(next_serial))
p = protocol.answerObject(packet.getId(), oid, serial, next_serial, p = protocol.answerObject(oid, serial, next_serial,
compression, checksum, data) compression, checksum, data)
else: else:
logging.debug('oid = %s not found', dump(oid)) logging.debug('oid = %s not found', dump(oid))
p = protocol.oidNotFound(packet.getId(), '%s does not exist' % dump(oid)) p = protocol.oidNotFound('%s does not exist' % dump(oid))
conn.addPacket(p) conn.answer(p, packet)
def handleAskTIDs(self, conn, packet, first, last, partition): def handleAskTIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError( 'invalid offsets'), packet)
'invalid offsets'))
return return
app = self.app app = self.app
...@@ -359,20 +355,19 @@ class OperationEventHandler(StorageEventHandler): ...@@ -359,20 +355,19 @@ class OperationEventHandler(StorageEventHandler):
tid_list = app.dm.getTIDList(first, last - first, tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list) app.num_partitions, partition_list)
conn.addPacket(protocol.answerTIDs(packet.getId(), tid_list)) conn.answer(protocol.answerTIDs(tid_list), packet)
def handleAskObjectHistory(self, conn, packet, oid, first, last): def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last: if first >= last:
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError( 'invalid offsets'), packet)
'invalid offsets'))
return return
app = self.app app = self.app
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None: if history_list is None:
history_list = [] history_list = []
conn.addPacket(protocol.answerObjectHistory(packet.getId(), oid, p = protocol.answerObjectHistory(oid, history_list)
history_list)) conn.answer(p, packet)
def handleAskStoreTransaction(self, conn, packet, tid, user, desc, def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list): ext, oid_list):
...@@ -385,7 +380,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -385,7 +380,7 @@ class OperationEventHandler(StorageEventHandler):
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid)) t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addTransaction(oid_list, user, desc, ext) t.addTransaction(oid_list, user, desc, ext)
conn.addPacket(protocol.answerStoreTransaction(packet.getId(), tid)) conn.answer(protocol.answerStoreTransaction(tid), packet)
def handleAskStoreObject(self, conn, packet, oid, serial, def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
...@@ -406,8 +401,8 @@ class OperationEventHandler(StorageEventHandler): ...@@ -406,8 +401,8 @@ class OperationEventHandler(StorageEventHandler):
# If a newer transaction already locks this object, # If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately. # do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid)) logging.info('unresolvable conflict in %s', dump(oid))
conn.addPacket(protocol.answerStoreObject(packet.getId(), 1, p = protocol.answerStoreObject(1, oid, locking_tid)
oid, locking_tid)) conn.answer(p, packet)
return return
# Next, check if this is generated from the latest revision. # Next, check if this is generated from the latest revision.
...@@ -416,14 +411,14 @@ class OperationEventHandler(StorageEventHandler): ...@@ -416,14 +411,14 @@ class OperationEventHandler(StorageEventHandler):
last_serial = history_list[0][0] last_serial = history_list[0][0]
if last_serial != serial: if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid)) logging.info('resolvable conflict in %s', dump(oid))
conn.addPacket(protocol.answerStoreObject(packet.getId(), 1, p = protocol.answerStoreObject(1, oid, last_serial)
oid, last_serial)) conn.answer(p, packet)
return return
# Now store the object. # Now store the object.
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid)) t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addObject(oid, compression, checksum, data) t.addObject(oid, compression, checksum, data)
conn.addPacket(protocol.answerStoreObject(packet.getId(), 0, p = protocol.answerStoreObject(0, oid, serial)
oid, serial)) conn.answer(p, packet)
app.store_lock_dict[oid] = tid app.store_lock_dict[oid] = tid
def handleAbortTransaction(self, conn, packet, tid): def handleAbortTransaction(self, conn, packet, tid):
...@@ -467,8 +462,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -467,8 +462,7 @@ class OperationEventHandler(StorageEventHandler):
# This method is complicated, because I must return OIDs only # This method is complicated, because I must return OIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError( 'invalid offsets'), packet)
'invalid offsets'))
return return
app = self.app app = self.app
...@@ -487,4 +481,4 @@ class OperationEventHandler(StorageEventHandler): ...@@ -487,4 +481,4 @@ class OperationEventHandler(StorageEventHandler):
oid_list = app.dm.getOIDList(first, last - first, oid_list = app.dm.getOIDList(first, last - first,
app.num_partitions, partition_list) app.num_partitions, partition_list)
conn.addPacket(protocol.answerOIDs(packet.getId(), oid_list)) conn.answer(protocol.answerOIDs(oid_list), packet)
...@@ -89,27 +89,20 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -89,27 +89,20 @@ class ReplicationEventHandler(StorageEventHandler):
present_tid_list = app.dm.getTIDListPresent(tid_list) present_tid_list = app.dm.getTIDListPresent(tid_list)
tid_set = set(tid_list) - set(present_tid_list) tid_set = set(tid_list) - set(present_tid_list)
for tid in tid_set: for tid in tid_set:
msg_id = conn.getNextId() conn.ask(protocol.askTransactionInformation(tid), timeout=300)
p = protocol.askTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300)
# And, ask more TIDs. # And, ask more TIDs.
app.replicator.tid_offset += 1000 app.replicator.tid_offset += 1000
offset = app.replicator.tid_offset offset = app.replicator.tid_offset
msg_id = conn.getNextId() p = protocol.askTIDs(offset, offset + 1000,
p = protocol.askTIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.ask(p, timeout=300)
conn.expectMessage(msg_id, timeout = 300)
else: else:
# If no more TID, a replication of transactions is finished. # If no more TID, a replication of transactions is finished.
# So start to replicate objects now. # So start to replicate objects now.
msg_id = conn.getNextId() p = protocol.askOIDs(0, 1000,
p = protocol.askOIDs(msg_id, 0, 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.ask(p, timeout=300)
conn.expectMessage(msg_id, timeout = 300)
app.replicator.oid_offset = 0 app.replicator.oid_offset = 0
def handleAnswerTransactionInformation(self, conn, packet, tid, def handleAnswerTransactionInformation(self, conn, packet, tid,
...@@ -129,10 +122,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -129,10 +122,7 @@ class ReplicationEventHandler(StorageEventHandler):
if oid_list: if oid_list:
# Pick one up, and ask the history. # Pick one up, and ask the history.
oid = oid_list.pop() oid = oid_list.pop()
msg_id = conn.getNextId() conn.ask(protocol.askObjectHistory(oid, 0, 1000), timeout=300)
p = protocol.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300)
app.replicator.serial_offset = 0 app.replicator.serial_offset = 0
app.replicator.oid_list = oid_list app.replicator.oid_list = oid_list
else: else:
...@@ -151,38 +141,28 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -151,38 +141,28 @@ class ReplicationEventHandler(StorageEventHandler):
present_serial_list = app.dm.getSerialListPresent(oid, serial_list) present_serial_list = app.dm.getSerialListPresent(oid, serial_list)
serial_set = set(serial_list) - set(present_serial_list) serial_set = set(serial_list) - set(present_serial_list)
for serial in serial_set: for serial in serial_set:
msg_id = conn.getNextId() conn.ask(protocol.askObject(oid, serial, INVALID_TID), timeout=300)
p = protocol.askObject(msg_id, oid, serial, INVALID_TID)
conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300)
# And, ask more serials. # And, ask more serials.
app.replicator.serial_offset += 1000 app.replicator.serial_offset += 1000
offset = app.replicator.serial_offset offset = app.replicator.serial_offset
msg_id = conn.getNextId() p = protocol.askObjectHistory(oid, offset, offset + 1000)
p = protocol.askObjectHistory(msg_id, oid, offset, offset + 1000) conn.ask(p, timeout=300)
conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300)
else: else:
# This OID is finished. So advance to next. # This OID is finished. So advance to next.
oid_list = app.replicator.oid_list oid_list = app.replicator.oid_list
if oid_list: if oid_list:
# If I have more pending OIDs, pick one up. # If I have more pending OIDs, pick one up.
oid = oid_list.pop() oid = oid_list.pop()
msg_id = conn.getNextId() conn.ask(protocol.askObjectHistory(oid, 0, 1000), timeout=300)
p = protocol.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p)
conn.expectMessage(msg_id, timeout = 300)
app.replicator.serial_offset = 0 app.replicator.serial_offset = 0
else: else:
# Otherwise, acquire more OIDs. # Otherwise, acquire more OIDs.
app.replicator.oid_offset += 1000 app.replicator.oid_offset += 1000
offset = app.replicator.oid_offset offset = app.replicator.oid_offset
msg_id = conn.getNextId()
p = protocol.askOIDs(msg_id, offset, offset + 1000, p = protocol.askOIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.ask(p, timeout=300)
conn.expectMessage(msg_id, timeout = 300)
def handleAnswerObject(self, conn, packet, oid, serial_start, def handleAnswerObject(self, conn, packet, oid, serial_start,
serial_end, compression, checksum, data): serial_end, compression, checksum, data):
...@@ -284,9 +264,7 @@ class Replicator(object): ...@@ -284,9 +264,7 @@ class Replicator(object):
def _askCriticalTID(self): def _askCriticalTID(self):
conn = self.primary_master_connection conn = self.primary_master_connection
msg_id = conn.getNextId() conn.ask(protocol.askLastIDs())
conn.addPacket(protocol.askLastIDs(msg_id))
conn.expectMessage(msg_id)
self.critical_tid_dict[msg_id] = self.new_partition_dict.values() self.critical_tid_dict[msg_id] = self.new_partition_dict.values()
self.partition_dict.update(self.new_partition_dict) self.partition_dict.update(self.new_partition_dict)
self.new_partition_dict = {} self.new_partition_dict = {}
...@@ -300,9 +278,7 @@ class Replicator(object): ...@@ -300,9 +278,7 @@ class Replicator(object):
def _askUnfinishedTIDs(self): def _askUnfinishedTIDs(self):
conn = self.primary_master_connection conn = self.primary_master_connection
msg_id = conn.getNextId() conn.ask(protocol.askUnfinishedTransactions())
conn.addPacket(protocol.askUnfinishedTransactions(msg_id))
conn.expectMessage(msg_id)
self.waiting_for_unfinished_tids = True self.waiting_for_unfinished_tids = True
def _startReplication(self): def _startReplication(self):
...@@ -334,17 +310,13 @@ class Replicator(object): ...@@ -334,17 +310,13 @@ class Replicator(object):
self.current_connection = ClientConnection(app.em, handler, self.current_connection = ClientConnection(app.em, handler,
addr = addr, addr = addr,
connector_handler = app.connector_handler) connector_handler = app.connector_handler)
msg_id = self.current_connection.getNextId() p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
p = protocol.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name) app.server[0], app.server[1], app.name)
self.current_connection.addPacket(p) self.current_connection.ask(p)
self.current_connection.expectMessage(msg_id)
self.tid_offset = 0 self.tid_offset = 0
msg_id = self.current_connection.getNextId() p = protocol.askTIDs(0, 1000, self.current_partition.getRID())
p = protocol.askTIDs(msg_id, 0, 1000, self.current_partition.getRID()) self.current_connection.ask(p, timeout=300)
self.current_connection.addPacket(p)
self.current_connection.expectMessage(msg_id, timeout = 300)
self.replication_done = False self.replication_done = False
...@@ -354,12 +326,9 @@ class Replicator(object): ...@@ -354,12 +326,9 @@ class Replicator(object):
self.partition_dict.pop(self.current_partition.getRID()) self.partition_dict.pop(self.current_partition.getRID())
# Notify to a primary master node that my cell is now up-to-date. # Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection conn = self.primary_master_connection
p = protocol.notifyPartitionChanges(conn.getNextId(), p = protocol.notifyPartitionChanges( app.ptid,
app.ptid, [(self.current_partition.getRID(), app.uuid, UP_TO_DATE_STATE)])
[(self.current_partition.getRID(), conn.send(p)
app.uuid,
UP_TO_DATE_STATE)])
conn.addPacket(p)
except ValueError: except ValueError:
pass pass
self.current_partition = None self.current_partition = None
......
...@@ -69,13 +69,13 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -69,13 +69,13 @@ class VerificationEventHandler(StorageEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.addPacket(protocol.notReady(packet.getId(), 'retry later')) conn.answer(protocol.notReady('retry later'), packet)
conn.abort() conn.abort()
return return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.addPacket(protocol.protocolError(packet.getId(), conn.answer(protocol.protocolError(
'invalid cluster name')) 'invalid cluster name'), packet)
conn.abort() conn.abort()
return return
...@@ -88,8 +88,8 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -88,8 +88,8 @@ class VerificationEventHandler(StorageEventHandler):
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away') p = protocol.brokenNodeDisallowedError('go away')
conn.addPacket(p) conn.answer(p, packet)
conn.abort() conn.abort()
return return
...@@ -97,11 +97,10 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -97,11 +97,10 @@ class VerificationEventHandler(StorageEventHandler):
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE, p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.uuid, app.server[0], app.server[1], app.server[0], app.server[1], app.num_partitions,
app.num_partitions, app.num_replicas, app.num_replicas, uuid)
uuid) conn.answer(p, packet)
conn.addPacket(p)
# Now the master node should know that I am not the right one. # Now the master node should know that I am not the right one.
conn.abort() conn.abort()
...@@ -128,8 +127,8 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -128,8 +127,8 @@ class VerificationEventHandler(StorageEventHandler):
app = self.app app = self.app
oid = app.dm.getLastOID() or INVALID_OID oid = app.dm.getLastOID() or INVALID_OID
tid = app.dm.getLastTID() or INVALID_TID tid = app.dm.getLastTID() or INVALID_TID
p = protocol.answerLastIDs(packet.getId(), oid, tid, app.ptid) p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.addPacket(p) conn.answer(p, packet)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -147,13 +146,12 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -147,13 +146,12 @@ class VerificationEventHandler(StorageEventHandler):
pass pass
row_list.append((offset, row)) row_list.append((offset, row))
except IndexError: except IndexError:
p = protocol.protocolError(packet.getId(), p = protocol.protocolError( 'invalid partition table offset')
'invalid partition table offset') conn.answer(p, packer)
conn.addPacket(p)
return return
p = protocol.answerPartitionTable(packet.getId(), app.ptid, row_list) p = protocol.answerPartitionTable(app.ptid, row_list)
conn.addPacket(p) conn.answer(p, packet)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -236,8 +234,8 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -236,8 +234,8 @@ class VerificationEventHandler(StorageEventHandler):
if not conn.isServerConnection(): if not conn.isServerConnection():
app = self.app app = self.app
tid_list = app.dm.getUnfinishedTIDList() tid_list = app.dm.getUnfinishedTIDList()
p = protocol.answerUnfinishedTransactions(packet.getId(), tid_list) p = protocol.answerUnfinishedTransactions(tid_list)
conn.addPacket(p) conn.answer(p, packet)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -252,21 +250,20 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -252,21 +250,20 @@ class VerificationEventHandler(StorageEventHandler):
t = app.dm.getTransaction(tid) t = app.dm.getTransaction(tid)
if t is None: if t is None:
p = protocol.tidNotFound(packet.getId(), '%s does not exist' % dump(tid)) p = protocol.tidNotFound('%s does not exist' % dump(tid))
else: else:
p = protocol.answerTransactionInformation(packet.getId(), tid, p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
t[1], t[2], t[3], t[0]) conn.answer(p, packet)
conn.addPacket(p)
def handleAskObjectPresent(self, conn, packet, oid, tid): def handleAskObjectPresent(self, conn, packet, oid, tid):
if not conn.isServerConnection(): if not conn.isServerConnection():
app = self.app app = self.app
if app.dm.objectPresent(oid, tid): if app.dm.objectPresent(oid, tid):
p = protocol.answerObjectPresent(packet.getId(), oid, tid) p = protocol.answerObjectPresent(oid, tid)
else: else:
p = protocol.oidNotFound(packet.getId(), p = protocol.oidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid))) '%s:%s do not exist' % (dump(oid), dump(tid)))
conn.addPacket(p) conn.answer(p, packet)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment