Commit c17f5f91 authored by Julien Muchembled's avatar Julien Muchembled

storage: only accept clients that are known by the master

Therefore, a client node in the node manager is always RUNNING.
parent d752aadb
...@@ -58,13 +58,6 @@ class ClientOperationHandler(EventHandler): ...@@ -58,13 +58,6 @@ class ClientOperationHandler(EventHandler):
compression, checksum, data, data_serial) compression, checksum, data, data_serial)
conn.answer(p) conn.answer(p)
def connectionLost(self, conn, new_state):
uuid = conn.getUUID()
node = self.app.nm.getByUUID(uuid)
if self.app.listening_conn: # if running
assert node is not None, conn
self.app.nm.remove(node)
def abortTransaction(self, conn, ttid): def abortTransaction(self, conn, ttid):
self.app.tm.abort(ttid) self.app.tm.abort(ttid)
......
...@@ -27,13 +27,12 @@ class IdentificationHandler(EventHandler): ...@@ -27,13 +27,12 @@ class IdentificationHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification') logging.warning('A connection was lost during identification')
def requestIdentification(self, conn, node_type, def requestIdentification(self, conn, node_type, uuid, address, name):
uuid, address, name):
self.checkClusterName(name) self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready # reject any incoming connections if not ready
if not self.app.ready: if not app.ready:
raise NotReadyError raise NotReadyError
app = self.app
if uuid is None: if uuid is None:
if node_type != NodeTypes.STORAGE: if node_type != NodeTypes.STORAGE:
raise ProtocolError('reject anonymous non-storage node') raise ProtocolError('reject anonymous non-storage node')
...@@ -43,8 +42,13 @@ class IdentificationHandler(EventHandler): ...@@ -43,8 +42,13 @@ class IdentificationHandler(EventHandler):
if uuid == app.uuid: if uuid == app.uuid:
raise ProtocolError("uuid conflict or loopback connection") raise ProtocolError("uuid conflict or loopback connection")
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
# If this node is broken, reject it. if node is None:
if node is not None and node.isBroken(): # Do never create node automatically, or we could get id
# conflicts. We must only rely on the notifications from the
# master to recognize nodes. So this is not always an error:
# maybe there are incoming notifications.
raise NotReadyError('unknown node: retry later')
if node.isBroken():
raise BrokenNodeDisallowedError raise BrokenNodeDisallowedError
# choose the handler according to the node type # choose the handler according to the node type
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
...@@ -52,20 +56,14 @@ class IdentificationHandler(EventHandler): ...@@ -52,20 +56,14 @@ class IdentificationHandler(EventHandler):
handler = ClientReadOnlyOperationHandler handler = ClientReadOnlyOperationHandler
else: else:
handler = ClientOperationHandler handler = ClientOperationHandler
if node is None: if node.isConnected(): # XXX
node = app.nm.createClient(uuid=uuid)
elif node.isConnected():
# This can happen if we haven't processed yet a notification # This can happen if we haven't processed yet a notification
# from the master, telling us the existing node is not # from the master, telling us the existing node is not
# running anymore. If we accept the new client, we won't # running anymore. If we accept the new client, we won't
# know what to do with this late notification. # know what to do with this late notification.
raise NotReadyError('uuid conflict: retry later') raise NotReadyError('uuid conflict: retry later')
node.setRunning() assert node.isRunning(), node
elif node_type == NodeTypes.STORAGE: elif node_type == NodeTypes.STORAGE:
if node is None:
logging.error('reject an unknown storage node %s',
uuid_str(uuid))
raise NotReadyError
handler = StorageOperationHandler handler = StorageOperationHandler
else: else:
raise ProtocolError('reject non-client-or-storage node') raise ProtocolError('reject non-client-or-storage node')
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import unittest import unittest
from mock import Mock from mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, NotReadyError, \ from neo.lib.protocol import NodeStates, NodeTypes, NotReadyError, \
BrokenNodeDisallowedError BrokenNodeDisallowedError
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.storage.app import Application from neo.storage.app import Application
...@@ -81,7 +81,7 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase): ...@@ -81,7 +81,7 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase):
""" accepted client must be connected and running """ """ accepted client must be connected and running """
uuid = self.getClientUUID() uuid = self.getClientUUID()
conn = self.getFakeConnection(uuid=uuid) conn = self.getFakeConnection(uuid=uuid)
node = self.app.nm.createClient(uuid=uuid) node = self.app.nm.createClient(uuid=uuid, state=NodeStates.RUNNING)
master = (self.local_ip, 3000) master = (self.local_ip, 3000)
self.app.master_node = Mock({ self.app.master_node = Mock({
'getAddress': master, 'getAddress': master,
......
...@@ -1061,17 +1061,40 @@ class Test(NEOThreadedTest): ...@@ -1061,17 +1061,40 @@ class Test(NEOThreadedTest):
cluster.stop() cluster.stop()
def testClientFailureDuringTpcFinish(self): def testClientFailureDuringTpcFinish(self):
def delayAskLockInformation(conn, packet): """
if isinstance(packet, Packets.AskLockInformation): Third scenario:
C M S | TID known by
---- Finish -----> |
---- Disconnect -- ----- Lock ------> |
----- C down ----> |
---- Connect ----> | M
----- C up ------> |
<---- Locked ----- |
------------------------------------------------+--------------
-- unlock ... |
---- FinalTID ---> | S (TM)
---- Connect + FinalTID --------------> |
... unlock ---> |
------------------------------------------------+--------------
| S (DM)
"""
def delayAnswerLockInformation(conn, packet):
if isinstance(packet, Packets.AnswerInformationLocked):
cluster.client.master_conn.close() cluster.client.master_conn.close()
return True return True
def askFinalTID(orig, *args): def askFinalTID(orig, *args):
m2s.remove(delayAskLockInformation) s2m.remove(delayAnswerLockInformation)
orig(*args) orig(*args)
def _getFinalTID(orig, ttid): def _getFinalTID(orig, ttid):
m2s.remove(delayAskLockInformation) s2m.remove(delayAnswerLockInformation)
self.tic() self.tic()
return orig(ttid) return orig(ttid)
def _connectToPrimaryNode(orig):
conn = orig()
self.tic()
s2m.remove(delayAnswerLockInformation)
return conn
cluster = NEOCluster() cluster = NEOCluster()
try: try:
cluster.start() cluster.start()
...@@ -1079,25 +1102,30 @@ class Test(NEOThreadedTest): ...@@ -1079,25 +1102,30 @@ class Test(NEOThreadedTest):
r = c.root() r = c.root()
r['x'] = PCounter() r['x'] = PCounter()
tid0 = r._p_serial tid0 = r._p_serial
with cluster.master.filterConnection(cluster.storage) as m2s: with cluster.storage.filterConnection(cluster.master) as s2m:
m2s.add(delayAskLockInformation, s2m.add(delayAnswerLockInformation,
Patch(ClientServiceHandler, askFinalTID=askFinalTID)) Patch(ClientServiceHandler, askFinalTID=askFinalTID))
t.commit() # the final TID is returned by the master t.commit() # the final TID is returned by the master
t.begin() t.begin()
r['x'].value += 1 r['x'].value += 1
tid1 = r._p_serial tid1 = r._p_serial
self.assertTrue(tid0 < tid1) self.assertTrue(tid0 < tid1)
with cluster.master.filterConnection(cluster.storage) as m2s: with cluster.storage.filterConnection(cluster.master) as s2m:
m2s.add(delayAskLockInformation, s2m.add(delayAnswerLockInformation,
Patch(cluster.client, _getFinalTID=_getFinalTID)) Patch(cluster.client, _getFinalTID=_getFinalTID))
t.commit() # the final TID is returned by the storage backend t.commit() # the final TID is returned by the storage backend
t.begin() t.begin()
r['x'].value += 1 r['x'].value += 1
tid2 = r['x']._p_serial tid2 = r['x']._p_serial
self.assertTrue(tid1 < tid2) self.assertTrue(tid1 < tid2)
with cluster.master.filterConnection(cluster.storage) as m2s: # The whole test would be simpler if we always delayed the
m2s.add(delayAskLockInformation, # AskLockInformation packet. However, it would also delay
Patch(cluster.client, _getFinalTID=_getFinalTID)) # NotifyNodeInformation and the client would fail to connect
# to the storage node.
with cluster.storage.filterConnection(cluster.master) as s2m, \
cluster.master.filterConnection(cluster.storage) as m2s:
s2m.add(delayAnswerLockInformation, Patch(cluster.client,
_connectToPrimaryNode=_connectToPrimaryNode))
m2s.add(lambda conn, packet: m2s.add(lambda conn, packet:
isinstance(packet, Packets.NotifyUnlockInformation)) isinstance(packet, Packets.NotifyUnlockInformation))
t.commit() # the final TID is returned by the storage (tm) t.commit() # the final TID is returned by the storage (tm)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment