Commit ba56e027 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Define get(Client|Server)List on event manager and use them to simplify the

logic in the master application, mainly during election stage.


git-svn-id: https://svn.erp5.org/repos/neo/trunk@1331 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5b2d787a
...@@ -221,6 +221,12 @@ class EpollEventManager(object): ...@@ -221,6 +221,12 @@ class EpollEventManager(object):
def getConnectionList(self): def getConnectionList(self):
return self.connection_dict.values() return self.connection_dict.values()
def getClientList(self):
return [c for c in self.connection_dict.values() if c.isClient()]
def getServerList(self):
return [c for c in self.connection_dict.values() if c.isServer()]
def getConnectionByUUID(self, uuid): def getConnectionByUUID(self, uuid):
""" Return the connection associated to the UUID, None if the UUID is """ Return the connection associated to the UUID, None if the UUID is
None, invalid or not found""" None, invalid or not found"""
......
...@@ -125,9 +125,8 @@ class Application(object): ...@@ -125,9 +125,8 @@ class Application(object):
raise RuntimeError, 'should not reach here' raise RuntimeError, 'should not reach here'
except (ElectionFailure, PrimaryFailure): except (ElectionFailure, PrimaryFailure):
# Forget all connections. # Forget all connections.
for conn in self.em.getConnectionList(): for conn in self.em.getClientList():
if not conn.isListening(): conn.close()
conn.close()
# Reelect a new primary master. # Reelect a new primary master.
self.electPrimary(bootstrap = False) self.electPrimary(bootstrap = False)
...@@ -201,24 +200,16 @@ class Application(object): ...@@ -201,24 +200,16 @@ class Application(object):
# I am the primary. # I am the primary.
self.primary = True self.primary = True
logging.debug('I am the primary, so sending an announcement') logging.debug('I am the primary, so sending an announcement')
for conn in em.getConnectionList(): for conn in em.getClientList():
if conn.isClient(): conn.notify(protocol.announcePrimaryMaster())
conn.notify(protocol.announcePrimaryMaster()) conn.abort()
conn.abort()
closed = False
t = time() t = time()
while not closed: while em.getClientList():
em.poll(1) em.poll(1)
closed = True
for conn in em.getConnectionList():
if conn.isClient():
closed = False
break
if t + 10 < time(): if t + 10 < time():
for conn in em.getConnectionList(): for conn in em.getClientList():
if conn.isClient(): conn.close()
conn.close() break
closed = True
else: else:
# Wait for an announcement. If this is too long, probably # Wait for an announcement. If this is too long, probably
# the primary master is down. # the primary master is down.
...@@ -231,14 +222,15 @@ class Application(object): ...@@ -231,14 +222,15 @@ class Application(object):
# Now I need only a connection to the primary master node. # Now I need only a connection to the primary master node.
primary = self.primary_master_node primary = self.primary_master_node
addr = primary.getAddress() addr = primary.getAddress()
for conn in em.getConnectionList(): for conn in em.getServerList():
if conn.isServer() or conn.isClient() \ conn.close()
and addr != conn.getAddress(): for conn in em.getClientList():
if conn.getAddress() != addr:
conn.close() conn.close()
# But if there is no such connection, something wrong happened. # But if there is no such connection, something wrong happened.
for conn in em.getConnectionList(): for conn in em.getClientList():
if conn.isClient() and addr == conn.getAddress(): if conn.getAddress() == addr:
break break
else: else:
raise ElectionFailure, 'no connection remains to the primary' raise ElectionFailure, 'no connection remains to the primary'
...@@ -248,37 +240,28 @@ class Application(object): ...@@ -248,37 +240,28 @@ class Application(object):
logging.error('election failed; %s' % m) logging.error('election failed; %s' % m)
# Ask all connected nodes to reelect a single primary master. # Ask all connected nodes to reelect a single primary master.
for conn in em.getConnectionList(): for conn in em.getClientList():
if conn.isClient(): conn.notify(protocol.reelectPrimaryMaster())
conn.notify(protocol.reelectPrimaryMaster()) conn.abort()
conn.abort()
# Wait until the connections are closed. # Wait until the connections are closed.
self.primary = None self.primary = None
self.primary_master_node = None self.primary_master_node = None
closed = False
t = time() t = time()
while not closed: while em.getClientList():
try: try:
em.poll(1) em.poll(1)
except ElectionFailure: except ElectionFailure:
pass pass
closed = True
for conn in em.getConnectionList():
if conn.isClient():
# Still not closed.
closed = False
break
if time() > t + 10: if time() > t + 10:
# If too long, do not wait. # If too long, do not wait.
break break
# Close all connections. # Close all connections.
for conn in em.getConnectionList(): for conn in em.getClientList():
if not conn.isListening(): conn.close()
conn.close() for conn in em.getServerList():
conn.close()
bootstrap = False bootstrap = False
# XXX: should accept a node list and send at most one packet per peer # XXX: should accept a node list and send at most one packet per peer
...@@ -639,23 +622,17 @@ class Application(object): ...@@ -639,23 +622,17 @@ class Application(object):
dump(self.uuid), *(self.server)) dump(self.uuid), *(self.server))
primary_master_handler = secondary.PrimaryMasterHandler(self) # apply the new handler to the primary connection
handler = identification.IdentificationHandler(self) client_list = self.em.getClientList()
em = self.em assert len(client_list) == 1
client_list[0].setHandler(secondary.PrimaryMasterHandler(self))
# Make sure that every connection has the secondary event handler. # and another for the future incoming connections
connection_list = em.getConnectionList() handler = identification.IdentificationHandler(self)
primary_master_found = False self.listening_conn.setHandler(handler)
for conn in em.getConnectionList():
if (not conn.isListening()) and conn.isClient():
assert not primary_master_found
primary_master_found = True
conn.setHandler(primary_master_handler)
else:
conn.setHandler(handler)
while 1: while 1:
em.poll(1) self.em.poll(1)
def changeClusterState(self, state): def changeClusterState(self, state):
""" Change the cluster state and apply right handler on each connections """ """ Change the cluster state and apply right handler on each connections """
...@@ -820,4 +797,3 @@ class Application(object): ...@@ -820,4 +797,3 @@ class Application(object):
logging.info('Accept a storage (%s)' % state) logging.info('Accept a storage (%s)' % state)
return (uuid, node, state, handler, node_ctor) return (uuid, node, state, handler, node_ctor)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment