Commit 727899e2 authored by Julien Muchembled's avatar Julien Muchembled

Pass app as first parameter of (*Client|Listening)Connection

Application will hold SSL parameters.
parent 7d5b1559
...@@ -80,7 +80,7 @@ class Application(BaseApplication): ...@@ -80,7 +80,7 @@ class Application(BaseApplication):
# Make a listening port. # Make a listening port.
handler = AdminEventHandler(self) handler = AdminEventHandler(self)
self.listening_conn = ListeningConnection(self.em, handler, self.server) self.listening_conn = ListeningConnection(self, handler, self.server)
while self.cluster_state != ClusterStates.STOPPING: while self.cluster_state != ClusterStates.STOPPING:
self.connectToPrimary() self.connectToPrimary()
......
...@@ -226,7 +226,7 @@ class Application(ThreadedApplication): ...@@ -226,7 +226,7 @@ class Application(ThreadedApplication):
index = (index + 1) % len(master_list) index = (index + 1) % len(master_list)
self.trying_master_node = master_list[index] self.trying_master_node = master_list[index]
# Connect to master # Connect to master
conn = MTClientConnection(self.em, conn = MTClientConnection(self,
self.notifications_handler, self.notifications_handler,
node=self.trying_master_node, node=self.trying_master_node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
......
...@@ -53,7 +53,7 @@ class ConnectionPool(object): ...@@ -53,7 +53,7 @@ class ConnectionPool(object):
"""Init a connection to a given storage node.""" """Init a connection to a given storage node."""
app = self.app app = self.app
logging.debug('trying to connect to %s - %s', node, node.getState()) logging.debug('trying to connect to %s - %s', node, node.getState())
conn = MTClientConnection(app.em, app.storage_event_handler, node, conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher) dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name) app.uuid, None, app.name)
......
...@@ -119,7 +119,8 @@ class BootstrapManager(EventHandler): ...@@ -119,7 +119,8 @@ class BootstrapManager(EventHandler):
Returns when the connection is made. Returns when the connection is made.
""" """
logging.info('connecting to a primary master node') logging.info('connecting to a primary master node')
em, nm = self.app.em, self.app.nm app = self.app
poll = app.em.poll
index = 0 index = 0
self.current = None self.current = None
conn = None conn = None
...@@ -129,12 +130,12 @@ class BootstrapManager(EventHandler): ...@@ -129,12 +130,12 @@ class BootstrapManager(EventHandler):
# conn closed # conn closed
conn = None conn = None
# select a master # select a master
master_list = nm.getMasterList() master_list = app.nm.getMasterList()
index = (index + 1) % len(master_list) index = (index + 1) % len(master_list)
self.current = master_list[index] self.current = master_list[index]
if conn is None: if conn is None:
# open the connection # open the connection
conn = ClientConnection(em, self, self.current) conn = ClientConnection(app, self, self.current)
# Yes, the connection may be already closed. This happens when # Yes, the connection may be already closed. This happens when
# the kernel reacts so quickly to a closed port that 'connect' # the kernel reacts so quickly to a closed port that 'connect'
# fails on the first call. In such case, poll(1) would deadlock # fails on the first call. In such case, poll(1) would deadlock
...@@ -142,7 +143,7 @@ class BootstrapManager(EventHandler): ...@@ -142,7 +143,7 @@ class BootstrapManager(EventHandler):
if conn.isClosed(): if conn.isClosed():
continue continue
# still processing # still processing
em.poll(1) poll(1)
return (self.current, conn, self.uuid, self.num_partitions, return (self.current, conn, self.uuid, self.num_partitions,
self.num_replicas) self.num_replicas)
......
...@@ -308,12 +308,12 @@ attributeTracker.track(BaseConnection) ...@@ -308,12 +308,12 @@ attributeTracker.track(BaseConnection)
class ListeningConnection(BaseConnection): class ListeningConnection(BaseConnection):
"""A listen connection.""" """A listen connection."""
def __init__(self, event_manager, handler, addr): def __init__(self, app, handler, addr):
logging.debug('listening to %s:%d', *addr) logging.debug('listening to %s:%d', *addr)
connector = self.ConnectorClass(addr) connector = self.ConnectorClass(addr)
BaseConnection.__init__(self, event_manager, handler, connector, addr) BaseConnection.__init__(self, app.em, handler, connector, addr)
connector.makeListeningConnection() connector.makeListeningConnection()
event_manager.register(self) self.em.register(self)
def readable(self): def readable(self):
try: try:
...@@ -646,10 +646,10 @@ class ClientConnection(Connection): ...@@ -646,10 +646,10 @@ class ClientConnection(Connection):
connecting = True connecting = True
client = True client = True
def __init__(self, event_manager, handler, node): def __init__(self, app, handler, node):
addr = node.getAddress() addr = node.getAddress()
connector = self.ConnectorClass(addr) connector = self.ConnectorClass(addr)
Connection.__init__(self, event_manager, handler, connector, addr) Connection.__init__(self, app.em, handler, connector, addr)
node.setConnection(self) node.setConnection(self)
handler.connectionStarted(self) handler.connectionStarted(self)
self._connect() self._connect()
......
...@@ -133,7 +133,7 @@ class Application(BaseApplication): ...@@ -133,7 +133,7 @@ class Application(BaseApplication):
def _run(self): def _run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
# Make a listening port. # Make a listening port.
self.listening_conn = ListeningConnection(self.em, None, self.server) self.listening_conn = ListeningConnection(self, None, self.server)
# Start a normal operation. # Start a normal operation.
while self.cluster_state != ClusterStates.STOPPING: while self.cluster_state != ClusterStates.STOPPING:
...@@ -184,7 +184,7 @@ class Application(BaseApplication): ...@@ -184,7 +184,7 @@ class Application(BaseApplication):
self.negotiating_master_node_set): self.negotiating_master_node_set):
for addr in self.unconnected_master_node_set: for addr in self.unconnected_master_node_set:
self.negotiating_master_node_set.add(addr) self.negotiating_master_node_set.add(addr)
ClientConnection(self.em, client_handler, ClientConnection(self, client_handler,
# XXX: Ugly, but the whole election code will be # XXX: Ugly, but the whole election code will be
# replaced soon # replaced soon
getByAddress(addr)) getByAddress(addr))
...@@ -371,7 +371,7 @@ class Application(BaseApplication): ...@@ -371,7 +371,7 @@ class Application(BaseApplication):
# Reconnect to primary master node. # Reconnect to primary master node.
primary_handler = secondary.PrimaryHandler(self) primary_handler = secondary.PrimaryHandler(self)
ClientConnection(self.em, primary_handler, self.primary_master_node) ClientConnection(self, primary_handler, self.primary_master_node)
# and another for the future incoming connections # and another for the future incoming connections
self.listening_conn.setHandler( self.listening_conn.setHandler(
......
...@@ -35,8 +35,7 @@ class NeoCTL(BaseApplication): ...@@ -35,8 +35,7 @@ class NeoCTL(BaseApplication):
def __getConnection(self): def __getConnection(self):
if not self.connected: if not self.connected:
self.connection = ClientConnection(self.em, self.handler, self.connection = ClientConnection(self, self.handler, self.server)
self.server)
# Never delay reconnection to master. This speeds up unit tests # Never delay reconnection to master. This speeds up unit tests
# and it should not change anything for normal use. # and it should not change anything for normal use.
self.connection.setReconnectionNoDelay() self.connection.setReconnectionNoDelay()
......
...@@ -167,7 +167,7 @@ class Application(BaseApplication): ...@@ -167,7 +167,7 @@ class Application(BaseApplication):
# Make a listening port # Make a listening port
handler = identification.IdentificationHandler(self) handler = identification.IdentificationHandler(self)
self.listening_conn = ListeningConnection(self.em, handler, self.server) self.listening_conn = ListeningConnection(self, handler, self.server)
self.server = self.listening_conn.getAddress() self.server = self.listening_conn.getAddress()
# Connect to a primary master node, verify data, and # Connect to a primary master node, verify data, and
......
...@@ -45,8 +45,7 @@ class Checker(object): ...@@ -45,8 +45,7 @@ class Checker(object):
conn = node.getConnection() conn = node.getConnection()
conn.asClient() conn.asClient()
else: else:
conn = ClientConnection(app.em, StorageOperationHandler(app), conn = ClientConnection(app, StorageOperationHandler(app), node)
node)
conn.ask(Packets.RequestIdentification( conn.ask(Packets.RequestIdentification(
NodeTypes.STORAGE, uuid, app.server, name)) NodeTypes.STORAGE, uuid, app.server, name))
self.conn_dict[conn] = node.isIdentified() self.conn_dict[conn] = node.isIdentified()
......
...@@ -254,7 +254,7 @@ class Replicator(object): ...@@ -254,7 +254,7 @@ class Replicator(object):
self.fetchTransactions() self.fetchTransactions()
else: else:
assert name or node.getUUID() != app.uuid, "loopback connection" assert name or node.getUUID() != app.uuid, "loopback connection"
conn = ClientConnection(app.em, StorageOperationHandler(app), node) conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE, conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name)) None if name else app.uuid, app.server, name or app.name))
if previous_node is not None and previous_node.isConnected(): if previous_node is not None and previous_node.isConnected():
......
...@@ -60,7 +60,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -60,7 +60,7 @@ class ConnectionTests(NeoUnitTestBase):
def setUp(self): def setUp(self):
NeoUnitTestBase.setUp(self) NeoUnitTestBase.setUp(self)
self.app = Mock({'__repr__': 'Fake App'}) self.app = Mock({'__repr__': 'Fake App'})
self.em = Mock({'__repr__': 'Fake Em'}) self.em = self.app.em = Mock({'__repr__': 'Fake Em'})
self.handler = Mock({'__repr__': 'Fake Handler'}) self.handler = Mock({'__repr__': 'Fake Handler'})
self.address = ("127.0.0.7", 93413) self.address = ("127.0.0.7", 93413)
self.node = Mock({'getAddress': self.address}) self.node = Mock({'getAddress': self.address})
...@@ -68,7 +68,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -68,7 +68,7 @@ class ConnectionTests(NeoUnitTestBase):
def _makeListeningConnection(self, addr): def _makeListeningConnection(self, addr):
with dummy_connector: with dummy_connector:
conn = ListeningConnection(self.em, self.handler, addr) conn = ListeningConnection(self.app, self.handler, addr)
self.connector = conn.connector self.connector = conn.connector
return conn return conn
...@@ -79,7 +79,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -79,7 +79,7 @@ class ConnectionTests(NeoUnitTestBase):
def _makeClientConnection(self): def _makeClientConnection(self):
with dummy_connector: with dummy_connector:
conn = ClientConnection(self.em, self.handler, self.node) conn = ClientConnection(self.app, self.handler, self.node)
self.connector = conn.connector self.connector = conn.connector
return conn return conn
...@@ -750,7 +750,7 @@ class MTConnectionTests(ConnectionTests): ...@@ -750,7 +750,7 @@ class MTConnectionTests(ConnectionTests):
def _makeClientConnection(self): def _makeClientConnection(self):
with dummy_connector: with dummy_connector:
conn = MTClientConnection(self.em, self.handler, self.node, conn = MTClientConnection(self.app, self.handler, self.node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
self.connector = conn.connector self.connector = conn.connector
return conn return conn
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment