Commit 4ebf90ec authored by Julien Muchembled's avatar Julien Muchembled

master: fix processing of AnnouncePrimary packets

During election, nodes are not identified anymore.
testElectionWithManyMasters only worked thanks to the election timeout.
parent 6a695c33
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, sys import os, sys, weakref
from time import time from time import time
from neo.lib import logging from neo.lib import logging
...@@ -90,6 +90,7 @@ class Application(object): ...@@ -90,6 +90,7 @@ class Application(object):
# election related data # election related data
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
self.master_address_dict = weakref.WeakKeyDictionary()
self._current_manager = None self._current_manager = None
...@@ -171,6 +172,7 @@ class Application(object): ...@@ -171,6 +172,7 @@ class Application(object):
client_handler = election.ClientElectionHandler(self) client_handler = election.ClientElectionHandler(self)
self.unconnected_master_node_set.clear() self.unconnected_master_node_set.clear()
self.negotiating_master_node_set.clear() self.negotiating_master_node_set.clear()
self.master_address_dict.clear()
self.listening_conn.setHandler(election.ServerElectionHandler(self)) self.listening_conn.setHandler(election.ServerElectionHandler(self))
getByAddress = self.nm.getByAddress getByAddress = self.nm.getByAddress
...@@ -299,6 +301,7 @@ class Application(object): ...@@ -299,6 +301,7 @@ class Application(object):
def playPrimaryRole(self): def playPrimaryRole(self):
logging.info('play the primary role with %r', self.listening_conn) logging.info('play the primary role with %r', self.listening_conn)
self.master_address_dict.clear()
em = self.em em = self.em
packet = Packets.AnnouncePrimary() packet = Packets.AnnouncePrimary()
for conn in em.getConnectionList(): for conn in em.getConnectionList():
...@@ -369,6 +372,7 @@ class Application(object): ...@@ -369,6 +372,7 @@ class Application(object):
if t < time(): if t < time():
# election timeout # election timeout
raise ElectionFailure("Election timeout") raise ElectionFailure("Election timeout")
self.master_address_dict.clear()
# Restart completely. Non-optimized # Restart completely. Non-optimized
# but lower level code needs to be stabilized first. # but lower level code needs to be stabilized first.
......
...@@ -23,30 +23,35 @@ from neo.lib.handler import EventHandler ...@@ -23,30 +23,35 @@ from neo.lib.handler import EventHandler
from neo.lib.util import dump from neo.lib.util import dump
from . import MasterHandler from . import MasterHandler
def elect(app, peer_address):
if app.server < peer_address:
app.primary = False
app.negotiating_master_node_set.discard(peer_address)
class BaseElectionHandler(EventHandler): class BaseElectionHandler(EventHandler):
def reelectPrimary(self, conn): def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def announcePrimary(self, conn): def announcePrimary(self, conn):
uuid = conn.getUUID()
if uuid is None:
raise ProtocolError('Not identified')
app = self.app app = self.app
if app.primary: if app.primary:
# I am also the primary... So restart the election. # I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
node = app.nm.getByUUID(uuid) try:
address = app.master_address_dict[conn]
assert conn.isServer()
except KeyError:
address = conn.getAddress()
assert conn.isClient()
app.primary = False app.primary = False
app.primary_master_node = node app.primary_master_node = node = app.nm.getByAddress(address)
app.negotiating_master_node_set.clear() app.negotiating_master_node_set.clear()
logging.info('%s is the primary', node) logging.info('%s is the primary', node)
def elect(self, conn, peer_address):
app = self.app
if app.server < peer_address:
app.primary = False
if conn is not None:
app.master_address_dict[conn] = peer_address
app.negotiating_master_node_set.discard(peer_address)
class ClientElectionHandler(BaseElectionHandler): class ClientElectionHandler(BaseElectionHandler):
...@@ -114,7 +119,7 @@ class ClientElectionHandler(BaseElectionHandler): ...@@ -114,7 +119,7 @@ class ClientElectionHandler(BaseElectionHandler):
app.negotiating_master_node_set.clear() app.negotiating_master_node_set.clear()
return return
elect(app, node.getAddress()) self.elect(None, node.getAddress())
class ServerElectionHandler(BaseElectionHandler, MasterHandler): class ServerElectionHandler(BaseElectionHandler, MasterHandler):
...@@ -128,6 +133,6 @@ class ServerElectionHandler(BaseElectionHandler, MasterHandler): ...@@ -128,6 +133,6 @@ class ServerElectionHandler(BaseElectionHandler, MasterHandler):
if node is None: if node is None:
node = app.nm.createMaster(address=address) node = app.nm.createMaster(address=address)
elect(app, address) self.elect(conn, address)
return uuid return uuid
...@@ -262,6 +262,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -262,6 +262,7 @@ class NeoUnitTestBase(NeoTestBase):
'getConnector': connector, 'getConnector': connector,
'getPeerId': peer_id, 'getPeerId': peer_id,
}) })
conn.mockAddReturnValues(__hash__ = id(conn))
conn.connecting = False conn.connecting = False
return conn return conn
......
...@@ -331,31 +331,6 @@ class MasterServerElectionTests(MasterClientElectionTestBase): ...@@ -331,31 +331,6 @@ class MasterServerElectionTests(MasterClientElectionTestBase):
}) })
self.assertEqual(self._requestIdentification(), primary) self.assertEqual(self._requestIdentification(), primary)
def testAnnouncePrimary1(self):
""" check the wrong cases """
announce = self.election.announcePrimary
# No uuid
node, conn = self.identifyToMasterNode(uuid=None)
self.checkProtocolErrorRaised(announce, conn)
# Announce to a primary, raise
self.app.primary = True
node, conn = self.identifyToMasterNode()
self.assertTrue(self.app.primary)
self.assertEqual(self.app.primary_master_node, None)
self.assertRaises(ElectionFailure, announce, conn)
def testAnnouncePrimary2(self):
""" Check the good case """
announce = self.election.announcePrimary
# Announce, must set the primary
self.app.primary = False
node, conn = self.identifyToMasterNode()
self.assertFalse(self.app.primary)
self.assertFalse(self.app.primary_master_node)
announce(conn)
self.assertFalse(self.app.primary)
self.assertEqual(self.app.primary_master_node, node)
def test_reelectPrimary(self): def test_reelectPrimary(self):
node, conn = self.identifyToMasterNode() node, conn = self.identifyToMasterNode()
self.assertRaises(ElectionFailure, self.election.reelectPrimary, conn) self.assertRaises(ElectionFailure, self.election.reelectPrimary, conn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment