Commit ff4242d4 authored by Julien Muchembled's avatar Julien Muchembled

Rename {Node,Connection}.notify to send

parent 4bde7d76
...@@ -139,6 +139,6 @@ class Application(BaseApplication): ...@@ -139,6 +139,6 @@ class Application(BaseApplication):
pass pass
row_list.append((offset, row)) row_list.append((offset, row))
except IndexError: except IndexError:
conn.notify(Errors.ProtocolError('invalid partition table offset')) conn.send(Errors.ProtocolError('invalid partition table offset'))
else: else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list)) conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
...@@ -569,7 +569,7 @@ class Application(ThreadedApplication): ...@@ -569,7 +569,7 @@ class Application(ThreadedApplication):
p = Packets.AbortTransaction(txn_context.ttid, ()) p = Packets.AbortTransaction(txn_context.ttid, ())
for uuid in txn_context.involved_nodes: for uuid in txn_context.involved_nodes:
try: try:
self.cp.connection_dict[uuid].notify(p) self.cp.connection_dict[uuid].send(p)
except (KeyError, ConnectionClosed): except (KeyError, ConnectionClosed):
pass pass
# Because we want to be sure that the involved nodes are notified, # Because we want to be sure that the involved nodes are notified,
...@@ -579,7 +579,7 @@ class Application(ThreadedApplication): ...@@ -579,7 +579,7 @@ class Application(ThreadedApplication):
# storage nodes keep a list of aborted transactions, but the # storage nodes keep a list of aborted transactions, but the
# difficult part would be to avoid a memory leak. # difficult part would be to avoid a memory leak.
try: try:
notify = self.master_conn.notify notify = self.master_conn.send
except AttributeError: except AttributeError:
pass pass
else: else:
......
...@@ -136,7 +136,7 @@ class HandlerSwitcher(object): ...@@ -136,7 +136,7 @@ class HandlerSwitcher(object):
logging.error('Unexpected answer %r in %r', packet, connection) logging.error('Unexpected answer %r in %r', packet, connection)
if not connection.isClosed(): if not connection.isClosed():
notification = Packets.Notify('Unexpected answer: %r' % packet) notification = Packets.Notify('Unexpected answer: %r' % packet)
connection.notify(notification) connection.send(notification)
connection.abort() connection.abort()
# handler.peerBroken(connection) # handler.peerBroken(connection)
# apply a pending handler if no more answers are pending # apply a pending handler if no more answers are pending
...@@ -379,7 +379,7 @@ class Connection(BaseConnection): ...@@ -379,7 +379,7 @@ class Connection(BaseConnection):
if self.server: if self.server:
del self.idle del self.idle
self.client = False self.client = False
self.notify(Packets.CloseClient()) self.send(Packets.CloseClient())
else: else:
self.close() self.close()
...@@ -548,7 +548,7 @@ class Connection(BaseConnection): ...@@ -548,7 +548,7 @@ class Connection(BaseConnection):
self.em.addWriter(self) self.em.addWriter(self)
logging.packet(self, packet, True) logging.packet(self, packet, True)
def notify(self, packet): def send(self, packet):
""" Then a packet with a new ID """ """ Then a packet with a new ID """
if self.isClosed(): if self.isClosed():
raise ConnectionClosed raise ConnectionClosed
...@@ -672,7 +672,7 @@ class MTConnectionType(type): ...@@ -672,7 +672,7 @@ class MTConnectionType(type):
if __debug__: if __debug__:
for name in 'answer',: for name in 'answer',:
setattr(cls, name, cls.lockCheckWrapper(name)) setattr(cls, name, cls.lockCheckWrapper(name))
for name in 'close', 'notify': for name in 'close', 'send':
setattr(cls, name, cls.__class__.lockWrapper(cls, name)) setattr(cls, name, cls.__class__.lockWrapper(cls, name))
for name in ('_delayedConnect', 'onTimeout', for name in ('_delayedConnect', 'onTimeout',
'process', 'readable', 'writable'): 'process', 'readable', 'writable'):
......
...@@ -40,17 +40,20 @@ class Node(object): ...@@ -40,17 +40,20 @@ class Node(object):
self._last_state_change = time() self._last_state_change = time()
manager.add(self) manager.add(self)
def notify(self, packet): @property
def send(self):
assert self.isConnected(), 'Not connected' assert self.isConnected(), 'Not connected'
self._connection.notify(packet) return self._connection.send
def ask(self, packet, *args, **kw): @property
def ask(self):
assert self.isConnected(), 'Not connected' assert self.isConnected(), 'Not connected'
self._connection.ask(packet, *args, **kw) return self._connection.ask
def answer(self, packet, msg_id=None): @property
def answer(self):
assert self.isConnected(), 'Not connected' assert self.isConnected(), 'Not connected'
self._connection.answer(packet, msg_id) return self._connection.answer
def getLastStateChange(self): def getLastStateChange(self):
return self._last_state_change return self._last_state_change
......
...@@ -208,7 +208,7 @@ class Application(BaseApplication): ...@@ -208,7 +208,7 @@ class Application(BaseApplication):
# Ask all connected nodes to reelect a single primary master. # Ask all connected nodes to reelect a single primary master.
for conn in self.em.getClientList(): for conn in self.em.getClientList():
conn.notify(Packets.ReelectPrimary()) conn.send(Packets.ReelectPrimary())
conn.abort() conn.abort()
# Wait until the connections are closed. # Wait until the connections are closed.
...@@ -257,7 +257,7 @@ class Application(BaseApplication): ...@@ -257,7 +257,7 @@ class Application(BaseApplication):
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType()) node_list = node_dict.get(node.getType())
if node_list and node.isRunning() and node is not exclude: if node_list and node.isRunning() and node is not exclude:
node.notify(Packets.NotifyNodeInformation(now, node_list)) node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list): def broadcastPartitionChanges(self, cell_list):
"""Broadcast a Notify Partition Changes packet.""" """Broadcast a Notify Partition Changes packet."""
...@@ -267,7 +267,7 @@ class Application(BaseApplication): ...@@ -267,7 +267,7 @@ class Application(BaseApplication):
packet = Packets.NotifyPartitionChanges(ptid, cell_list) packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if node.isRunning() and not node.isMaster(): if node.isRunning() and not node.isMaster():
node.notify(packet) node.send(packet)
def provideService(self): def provideService(self):
""" """
...@@ -292,7 +292,7 @@ class Application(BaseApplication): ...@@ -292,7 +292,7 @@ class Application(BaseApplication):
for node in self.nm.getStorageList(only_identified=True): for node in self.nm.getStorageList(only_identified=True):
tid_dict[node.getUUID()] = tid tid_dict[node.getUUID()] = tid
if node.isRunning(): if node.isRunning():
node.notify(packet) node.send(packet)
self.pt.setBackupTidDict(tid_dict) self.pt.setBackupTidDict(tid_dict)
def playPrimaryRole(self): def playPrimaryRole(self):
...@@ -304,7 +304,7 @@ class Application(BaseApplication): ...@@ -304,7 +304,7 @@ class Application(BaseApplication):
if conn.isListening(): if conn.isListening():
conn.setHandler(identification.IdentificationHandler(self)) conn.setHandler(identification.IdentificationHandler(self))
else: else:
conn.notify(packet) conn.send(packet)
# Primary master should rather establish connections to all # Primary master should rather establish connections to all
# secondaries, rather than the other way around. This requires # secondaries, rather than the other way around. This requires
# a bit more work when a new master joins a cluster but makes # a bit more work when a new master joins a cluster but makes
...@@ -375,12 +375,12 @@ class Application(BaseApplication): ...@@ -375,12 +375,12 @@ class Application(BaseApplication):
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient(): if node.isStorage() or node.isClient():
conn = node.getConnection() conn = node.getConnection()
conn.notify(Packets.StopOperation()) conn.send(Packets.StopOperation())
if node.isClient(): if node.isClient():
conn.abort() conn.abort()
continue continue
if truncate: if truncate:
conn.notify(truncate) conn.send(truncate)
if node.isRunning(): if node.isRunning():
node.setPending() node.setPending()
node_list.append(node) node_list.append(node)
...@@ -453,7 +453,7 @@ class Application(BaseApplication): ...@@ -453,7 +453,7 @@ class Application(BaseApplication):
notification_packet = Packets.NotifyClusterInformation(state) notification_packet = Packets.NotifyClusterInformation(state)
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
conn = node.getConnection() conn = node.getConnection()
conn.notify(notification_packet) conn.send(notification_packet)
if node.isClient(): if node.isClient():
if state == ClusterStates.RUNNING: if state == ClusterStates.RUNNING:
handler = self.client_service_handler handler = self.client_service_handler
...@@ -512,7 +512,7 @@ class Application(BaseApplication): ...@@ -512,7 +512,7 @@ class Application(BaseApplication):
conn = node.getConnection() conn = node.getConnection()
if node.isStorage(): if node.isStorage():
conn.setHandler(handler) conn.setHandler(handler)
conn.notify(Packets.NotifyNodeInformation(now, (( conn.send(Packets.NotifyNodeInformation(now, ((
node.getType(), node.getAddress(), node.getUUID(), node.getType(), node.getAddress(), node.getUUID(),
NodeStates.TEMPORARILY_DOWN, None),))) NodeStates.TEMPORARILY_DOWN, None),)))
conn.abort() conn.abort()
...@@ -549,13 +549,13 @@ class Application(BaseApplication): ...@@ -549,13 +549,13 @@ class Application(BaseApplication):
c.answer(Packets.AnswerTransactionFinished(ttid, tid), c.answer(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) msg_id=txn.getMessageId())
else: else:
c.notify(invalidate_objects) c.send(invalidate_objects)
# Unlock Information to relevant storage nodes. # Unlock Information to relevant storage nodes.
notify_unlock = Packets.NotifyUnlockInformation(ttid) notify_unlock = Packets.NotifyUnlockInformation(ttid)
getByUUID = self.nm.getByUUID getByUUID = self.nm.getByUUID
for storage_uuid in txn.getUUIDList(): for storage_uuid in txn.getUUIDList():
getByUUID(storage_uuid).getConnection().notify(notify_unlock) getByUUID(storage_uuid).send(notify_unlock)
# Notify storage that have replications blocked by this transaction, # Notify storage that have replications blocked by this transaction,
# and clients that try to recover from a failure during tpc_finish. # and clients that try to recover from a failure during tpc_finish.
...@@ -566,7 +566,7 @@ class Application(BaseApplication): ...@@ -566,7 +566,7 @@ class Application(BaseApplication):
# There should be only 1 client interested. # There should be only 1 client interested.
node.answer(Packets.AnswerFinalTID(tid)) node.answer(Packets.AnswerFinalTID(tid))
else: else:
node.notify(notify_finished) node.send(notify_finished)
assert self.last_transaction < tid, (self.last_transaction, tid) assert self.last_transaction < tid, (self.last_transaction, tid)
self.setLastTransaction(tid) self.setLastTransaction(tid)
...@@ -583,7 +583,7 @@ class Application(BaseApplication): ...@@ -583,7 +583,7 @@ class Application(BaseApplication):
self.tm.executeQueuedEvents() self.tm.executeQueuedEvents()
def startStorage(self, node): def startStorage(self, node):
node.notify(Packets.StartOperation(self.backup_tid)) node.send(Packets.StartOperation(self.backup_tid))
uuid = node.getUUID() uuid = node.getUUID()
assert uuid not in self.storage_starting_set assert uuid not in self.storage_starting_set
if uuid not in self.storage_ready_dict: if uuid not in self.storage_ready_dict:
......
...@@ -189,7 +189,7 @@ class BackupApplication(object): ...@@ -189,7 +189,7 @@ class BackupApplication(object):
"ask %s to replicate partition %u up to %s from %s", "ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset, dump(tid), uuid_str(cell.getUUID()), offset, dump(tid),
uuid_str(primary_node.getUUID())) uuid_str(primary_node.getUUID()))
cell.getNode().getConnection().notify(p) cell.getNode().send(p)
trigger_set.add(primary_node) trigger_set.add(primary_node)
for node in trigger_set: for node in trigger_set:
self.triggerBackup(node) self.triggerBackup(node)
...@@ -252,7 +252,7 @@ class BackupApplication(object): ...@@ -252,7 +252,7 @@ class BackupApplication(object):
cell.replicating = tid cell.replicating = tid
for node, untouched_dict in untouched_dict.iteritems(): for node, untouched_dict in untouched_dict.iteritems():
if app.isStorageReady(node.getUUID()): if app.isStorageReady(node.getUUID()):
node.notify(Packets.Replicate(tid, '', untouched_dict)) node.send(Packets.Replicate(tid, '', untouched_dict))
for node in trigger_set: for node in trigger_set:
self.triggerBackup(node) self.triggerBackup(node)
count = sum(map(len, self.tid_list)) count = sum(map(len, self.tid_list))
...@@ -288,8 +288,7 @@ class BackupApplication(object): ...@@ -288,8 +288,7 @@ class BackupApplication(object):
source_dict[offset] = addr source_dict[offset] = addr
logging.debug("ask %s to replicate partition %u up to %s from %r", logging.debug("ask %s to replicate partition %u up to %s from %r",
uuid_str(node.getUUID()), offset, dump(tid), addr) uuid_str(node.getUUID()), offset, dump(tid), addr)
node.getConnection().notify(Packets.Replicate( node.send(Packets.Replicate(tid, self.name, source_dict))
tid, self.name, source_dict))
def notifyReplicationDone(self, node, offset, tid): def notifyReplicationDone(self, node, offset, tid):
app = self.app app = self.app
...@@ -306,7 +305,7 @@ class BackupApplication(object): ...@@ -306,7 +305,7 @@ class BackupApplication(object):
last_tid = app.getLastTransaction() last_tid = app.getLastTransaction()
if tid < last_tid: if tid < last_tid:
tid = last_tid tid = last_tid
node.notify(Packets.Replicate(tid, '', {offset: None})) node.send(Packets.Replicate(tid, '', {offset: None}))
logging.debug("partition %u: updating backup_tid of %r to %s", logging.debug("partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid)) offset, cell, dump(tid))
cell.backup_tid = tid cell.backup_tid = tid
...@@ -330,7 +329,7 @@ class BackupApplication(object): ...@@ -330,7 +329,7 @@ class BackupApplication(object):
"ask %s to replicate partition %u up to %s from %s", "ask %s to replicate partition %u up to %s from %s",
uuid_str(node.getUUID()), offset, dump(max_tid), uuid_str(node.getUUID()), offset, dump(max_tid),
uuid_str(primary_node.getUUID())) uuid_str(primary_node.getUUID()))
node.notify(Packets.Replicate(max_tid, '', node.send(Packets.Replicate(max_tid, '',
{offset: primary_node.getAddress()})) {offset: primary_node.getAddress()}))
else: else:
if app.getClusterState() == ClusterStates.BACKINGUP: if app.getClusterState() == ClusterStates.BACKINGUP:
...@@ -346,5 +345,5 @@ class BackupApplication(object): ...@@ -346,5 +345,5 @@ class BackupApplication(object):
"ask %s to replicate partition %u up to %s from %s", "ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset, uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID())) dump(tid), uuid_str(node.getUUID()))
cell.getNode().notify(p) cell.getNode().send(p)
return result return result
...@@ -89,7 +89,7 @@ class MasterHandler(EventHandler): ...@@ -89,7 +89,7 @@ class MasterHandler(EventHandler):
node_list.extend(n.asTuple() for n in nm.getMasterList()) node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getClientList()) node_list.extend(n.asTuple() for n in nm.getClientList())
node_list.extend(n.asTuple() for n in nm.getStorageList()) node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list)) conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
pt = self.app.pt pt = self.app.pt
...@@ -106,7 +106,7 @@ class BaseServiceHandler(MasterHandler): ...@@ -106,7 +106,7 @@ class BaseServiceHandler(MasterHandler):
def connectionCompleted(self, conn, new): def connectionCompleted(self, conn, new):
self._notifyNodeInformation(conn) self._notifyNodeInformation(conn)
pt = self.app.pt pt = self.app.pt
conn.notify(Packets.SendPartitionTable(pt.getID(), pt.getRowList())) conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
......
...@@ -103,7 +103,7 @@ class AdministrationHandler(MasterHandler): ...@@ -103,7 +103,7 @@ class AdministrationHandler(MasterHandler):
node.setState(state) node.setState(state)
if node.isConnected(): if node.isConnected():
# notify itself so it can shutdown # notify itself so it can shutdown
node.notify(Packets.NotifyNodeInformation( node.send(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()])) monotonic_time(), [node.asTuple()]))
# close to avoid handle the closure as a connection lost # close to avoid handle the closure as a connection lost
node.getConnection().abort() node.getConnection().abort()
...@@ -122,7 +122,7 @@ class AdministrationHandler(MasterHandler): ...@@ -122,7 +122,7 @@ class AdministrationHandler(MasterHandler):
# ignores non-running nodes # ignores non-running nodes
assert not node.isRunning() assert not node.isRunning()
if node.isConnected(): if node.isConnected():
node.notify(Packets.NotifyNodeInformation( node.send(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()])) monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
...@@ -161,7 +161,7 @@ class AdministrationHandler(MasterHandler): ...@@ -161,7 +161,7 @@ class AdministrationHandler(MasterHandler):
node_list.append(node) node_list.append(node)
repair = Packets.NotifyRepair(*args) repair = Packets.NotifyRepair(*args)
for node in node_list: for node in node_list:
node.notify(repair) node.send(repair)
conn.answer(Errors.Ack('')) conn.answer(Errors.Ack(''))
def tweakPartitionTable(self, conn, uuid_list): def tweakPartitionTable(self, conn, uuid_list):
...@@ -225,6 +225,6 @@ class AdministrationHandler(MasterHandler): ...@@ -225,6 +225,6 @@ class AdministrationHandler(MasterHandler):
).getAddress() ).getAddress()
else: else:
source = '', None source = '', None
node.getConnection().notify(Packets.CheckPartition( node.send(Packets.CheckPartition(
offset, source, min_tid, max_tid)) offset, source, min_tid, max_tid))
conn.answer(Errors.Ack('')) conn.answer(Errors.Ack(''))
...@@ -38,7 +38,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -38,7 +38,7 @@ class ClientServiceHandler(MasterHandler):
node_list = [nm.getByUUID(conn.getUUID()).asTuple()] # for id_timestamp node_list = [nm.getByUUID(conn.getUUID()).asTuple()] # for id_timestamp
node_list.extend(n.asTuple() for n in nm.getMasterList()) node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getStorageList()) node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list)) conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askBeginTransaction(self, conn, tid): def askBeginTransaction(self, conn, tid):
""" """
...@@ -132,7 +132,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -132,7 +132,7 @@ class ClientServiceHandler(MasterHandler):
p = Packets.AbortTransaction(tid, ()) p = Packets.AbortTransaction(tid, ())
getByUUID = app.nm.getByUUID getByUUID = app.nm.getByUUID
for involved in involved: for involved in involved:
getByUUID(involved).notify(p) getByUUID(involved).send(p)
# like ClientServiceHandler but read-only & only for tid <= backup_tid # like ClientServiceHandler but read-only & only for tid <= backup_tid
......
...@@ -39,7 +39,7 @@ class SecondaryMasterHandler(MasterHandler): ...@@ -39,7 +39,7 @@ class SecondaryMasterHandler(MasterHandler):
def _notifyNodeInformation(self, conn): def _notifyNodeInformation(self, conn):
node_list = [n.asTuple() for n in self.app.nm.getMasterList()] node_list = [n.asTuple() for n in self.app.nm.getMasterList()]
conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list)) conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
class PrimaryHandler(EventHandler): class PrimaryHandler(EventHandler):
""" Handler used by secondaries to handle primary master""" """ Handler used by secondaries to handle primary master"""
......
...@@ -89,7 +89,7 @@ class RecoveryManager(MasterHandler): ...@@ -89,7 +89,7 @@ class RecoveryManager(MasterHandler):
truncate = Packets.Truncate(app.truncate_tid) truncate = Packets.Truncate(app.truncate_tid)
for node in node_list: for node in node_list:
conn = node.getConnection() conn = node.getConnection()
conn.notify(truncate) conn.send(truncate)
self.connectionCompleted(conn, False) self.connectionCompleted(conn, False)
continue continue
node_list = pt.getConnectedNodeList() node_list = pt.getConnectedNodeList()
...@@ -182,4 +182,4 @@ class RecoveryManager(MasterHandler): ...@@ -182,4 +182,4 @@ class RecoveryManager(MasterHandler):
def _notifyAdmins(self, *packets): def _notifyAdmins(self, *packets):
for node in self.app.nm.getAdminList(only_identified=True): for node in self.app.nm.getAdminList(only_identified=True):
for packet in packets: for packet in packets:
node.notify(packet) node.send(packet)
...@@ -316,7 +316,7 @@ class TransactionManager(EventQueue): ...@@ -316,7 +316,7 @@ class TransactionManager(EventQueue):
logging.info('Deadlock avoidance triggered by %s for %s:' logging.info('Deadlock avoidance triggered by %s for %s:'
' new locking tid for TXN %s is %s', uuid_str(storage_id), ' new locking tid for TXN %s is %s', uuid_str(storage_id),
uuid_str(client.getUUID()), dump(ttid), dump(locking_tid)) uuid_str(client.getUUID()), dump(ttid), dump(locking_tid))
client.notify(Packets.NotifyDeadlock(ttid, locking_tid)) client.send(Packets.NotifyDeadlock(ttid, locking_tid))
def vote(self, app, ttid, uuid_list): def vote(self, app, ttid, uuid_list):
""" """
......
...@@ -131,7 +131,7 @@ class VerificationManager(BaseServiceHandler): ...@@ -131,7 +131,7 @@ class VerificationManager(BaseServiceHandler):
if uuid_set: if uuid_set:
packet = Packets.ValidateTransaction(ttid, tid) packet = Packets.ValidateTransaction(ttid, tid)
for node in getIdentifiedList(pool_set=uuid_set): for node in getIdentifiedList(pool_set=uuid_set):
node.notify(packet) node.send(packet)
def answerLastIDs(self, conn, loid, ltid): def answerLastIDs(self, conn, loid, ltid):
self._uuid_set.remove(conn.getUUID()) self._uuid_set.remove(conn.getUUID())
......
...@@ -235,7 +235,7 @@ class Application(BaseApplication): ...@@ -235,7 +235,7 @@ class Application(BaseApplication):
self.master_conn.setHandler(initialization.InitializationHandler(self)) self.master_conn.setHandler(initialization.InitializationHandler(self))
while not self.operational: while not self.operational:
_poll() _poll()
self.master_conn.notify(Packets.NotifyReady()) self.master_conn.send(Packets.NotifyReady())
def doOperation(self): def doOperation(self):
"""Handle everything, including replications and transactions.""" """Handle everything, including replications and transactions."""
......
...@@ -181,7 +181,7 @@ class Checker(object): ...@@ -181,7 +181,7 @@ class Checker(object):
uuid_list.append(conn.getUUID()) uuid_list.append(conn.getUUID())
self.app.closeClient(conn) self.app.closeClient(conn)
p = Packets.NotifyPartitionCorrupted(self.partition, uuid_list) p = Packets.NotifyPartitionCorrupted(self.partition, uuid_list)
self.app.master_conn.notify(p) self.app.master_conn.send(p)
if len(self.conn_dict) <= 1: if len(self.conn_dict) <= 1:
logging.warning("check of partition %u aborted", self.partition) logging.warning("check of partition %u aborted", self.partition)
self.queue.clear() self.queue.clear()
......
...@@ -210,7 +210,7 @@ class StorageOperationHandler(EventHandler): ...@@ -210,7 +210,7 @@ class StorageOperationHandler(EventHandler):
"partition %u dropped" % partition)) "partition %u dropped" % partition))
return return
oid_list, user, desc, ext, packed, ttid = t oid_list, user, desc, ext, packed, ttid = t
conn.notify(Packets.AddTransaction( conn.send(Packets.AddTransaction(
tid, user, desc, ext, packed, ttid, oid_list)) tid, user, desc, ext, packed, ttid, oid_list))
yield yield
conn.answer(Packets.AnswerFetchTransactions( conn.answer(Packets.AnswerFetchTransactions(
...@@ -253,7 +253,7 @@ class StorageOperationHandler(EventHandler): ...@@ -253,7 +253,7 @@ class StorageOperationHandler(EventHandler):
conn.answer(Errors.ReplicationError( conn.answer(Errors.ReplicationError(
"partition %u dropped or truncated" % partition)) "partition %u dropped or truncated" % partition))
return return
conn.notify(Packets.AddObject(oid, serial, *object[2:])) conn.send(Packets.AddObject(oid, serial, *object[2:]))
yield yield
conn.answer(Packets.AnswerFetchObjects( conn.answer(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id)
......
...@@ -147,7 +147,7 @@ class TransactionManager(EventQueue): ...@@ -147,7 +147,7 @@ class TransactionManager(EventQueue):
# are now locked normally and we don't rely anymore on other # are now locked normally and we don't rely anymore on other
# readable cells to check locks: we're really up-to-date. # readable cells to check locks: we're really up-to-date.
for partition in notify: for partition in notify:
self._app.master_conn.notify(Packets.NotifyReplicationDone( self._app.master_conn.send(Packets.NotifyReplicationDone(
partition, replicated.pop(partition))) partition, replicated.pop(partition)))
for oid, ttid in store_lock_dict.iteritems(): for oid, ttid in store_lock_dict.iteritems():
if getPartition(oid) in notify: if getPartition(oid) in notify:
...@@ -355,7 +355,7 @@ class TransactionManager(EventQueue): ...@@ -355,7 +355,7 @@ class TransactionManager(EventQueue):
# Ask master to give the client a new locking tid, which will # Ask master to give the client a new locking tid, which will
# be used to ask all involved storage nodes to rebase the # be used to ask all involved storage nodes to rebase the
# already locked oids for this transaction. # already locked oids for this transaction.
self._app.master_conn.notify(Packets.NotifyDeadlock( self._app.master_conn.send(Packets.NotifyDeadlock(
ttid, transaction.locking_tid)) ttid, transaction.locking_tid))
self._rebase(transaction, ttid) self._rebase(transaction, ttid)
raise DelayEvent raise DelayEvent
......
...@@ -326,7 +326,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -326,7 +326,7 @@ class NeoUnitTestBase(NeoTestBase):
def checkNoPacketSent(self, conn): def checkNoPacketSent(self, conn):
""" check if no packet were sent """ """ check if no packet were sent """
self._checkNoPacketSend(conn, 'notify') self._checkNoPacketSend(conn, 'send')
self._checkNoPacketSend(conn, 'answer') self._checkNoPacketSend(conn, 'answer')
self._checkNoPacketSend(conn, 'ask') self._checkNoPacketSend(conn, 'ask')
...@@ -372,7 +372,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -372,7 +372,7 @@ class NeoUnitTestBase(NeoTestBase):
def checkNotifyPacket(self, conn, packet_type, packet_number=0): def checkNotifyPacket(self, conn, packet_type, packet_number=0):
""" Check if a notify-packet with the right type is sent """ """ Check if a notify-packet with the right type is sent """
calls = conn.mockGetNamedCalls('notify') calls = conn.mockGetNamedCalls('send')
packet = calls.pop(packet_number).getParam(0) packet = calls.pop(packet_number).getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet)) self.assertTrue(isinstance(packet, protocol.Packet))
self.assertEqual(type(packet), packet_type) self.assertEqual(type(packet), packet_type)
......
...@@ -92,7 +92,7 @@ class MasterAppTests(NeoUnitTestBase): ...@@ -92,7 +92,7 @@ class MasterAppTests(NeoUnitTestBase):
self.app.broadcastNodesInformation([s_node]) self.app.broadcastNodesInformation([s_node])
# check conn # check conn
self.assertFalse(client_conn.mockGetNamedCalls('notify')) self.assertFalse(client_conn.mockGetNamedCalls('send'))
self.checkNoPacketSent(master_conn) self.checkNoPacketSent(master_conn)
self.checkNotifyNodeInformation(storage_conn) self.checkNotifyNodeInformation(storage_conn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment