Commit 27b900a5 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use anytime is possible the is<State>() and set<State> available in Node class.

Remove all (now useless) imports from protocol module.


git-svn-id: https://svn.erp5.org/repos/neo/trunk@1327 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 71e1beef
......@@ -147,7 +147,7 @@ class ConnectionPool(object):
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if node.getState() != protocol.RUNNING_STATE:
if not node.isRunning():
return None
uuid = node.getUUID()
self.connection_lock_acquire()
......
......@@ -21,8 +21,7 @@ from time import time
from struct import pack, unpack
from neo import protocol
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UUID_NAMESPACES, BOOTING_CLUSTER_STATE
from neo.protocol import UUID_NAMESPACES, BOOTING_CLUSTER_STATE
from neo.node import NodeManager
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
......@@ -151,7 +150,7 @@ class Application(object):
for node in nm.getMasterList():
# For now, believe that every node should be available,
# since down or broken nodes may be already repaired.
node.setState(RUNNING_STATE)
node.setRunning()
while 1:
t = 0
......@@ -159,7 +158,7 @@ class Application(object):
self.primary_master_node = None
for node in nm.getMasterList():
if node.getState() == RUNNING_STATE:
if node.isRunning():
self.unconnected_master_node_set.add(node.getAddress())
# Wait at most 20 seconds at bootstrap. Otherwise, wait at most
......@@ -178,10 +177,10 @@ class Application(object):
if current_time >= t + 1:
t = current_time
for node in nm.getMasterList():
if node.getState() == TEMPORARILY_DOWN_STATE \
if node.isTemporarilyDown() \
and node.getLastStateChange() + expiration < current_time:
logging.info('%s is down' % (node, ))
node.setState(DOWN_STATE)
node.setDown()
self.unconnected_master_node_set.discard(node.getAddress())
# Try to connect to master nodes.
......@@ -382,7 +381,7 @@ class Application(object):
# take the first node available
node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
for node in node_list:
node.setState(protocol.RUNNING_STATE)
node.setRunning()
self.broadcastNodeInformation(node)
# resert IDs generators
self.loid = '\0' * 8
......@@ -420,7 +419,7 @@ class Application(object):
allowed_node_set = set(self.pt.getNodeList())
refused_node_set = set(self.nm.getStorageList()) - allowed_node_set
for node in refused_node_set:
node.setState(protocol.PENDING_STATE)
node.setPending()
self.broadcastNodeInformation(node)
logging.debug('cluster starts with loid=%s and this partition table :',
......@@ -625,8 +624,8 @@ class Application(object):
# If I know any storage node, make sure that they are not in the running state,
# because they are not connected at this stage.
for node in nm.getStorageList():
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
if node.isRunning():
node.setTemporarilyDown()
# recover the cluster status at startup
self.recoverStatus()
......@@ -752,7 +751,7 @@ class Application(object):
node = self.nm.getByUUID(c.getUUID())
if node.isClient():
node_list = [(node.getType(), node.getAddress(),
node.getUUID(), DOWN_STATE)]
node.getUUID(), protocol.DOWN_STATE)]
c.notify(protocol.notifyNodeInformation(node_list))
# then ask storages and master nodes to shutdown
logging.info("asking all remaining nodes to shutdown")
......@@ -760,7 +759,7 @@ class Application(object):
node = self.nm.getByUUID(c.getUUID())
if node.isStorage() or node.isMaster():
node_list = [(node.getType(), node.getAddress(),
node.getUUID(), DOWN_STATE)]
node.getUUID(), protocol.DOWN_STATE)]
c.notify(protocol.notifyNodeInformation(node_list))
# then shutdown
sys.exit("Cluster has been asked to shut down")
......
......@@ -44,12 +44,14 @@ class MasterHandler(EventHandler):
known_master_list = [(app.server, app.uuid, )]
for n in app.nm.getMasterList():
if n.getState() == protocol.BROKEN_STATE:
if n.isBroken():
continue
known_master_list.append((n.getAddress(), n.getUUID(), ))
conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list),
packet.getId())
conn.answer(protocol.answerPrimaryMaster(
primary_uuid,
known_master_list),
packet.getId(),
)
def handleAskClusterState(self, conn, packet):
assert conn.getUUID() is not None
......@@ -87,7 +89,7 @@ class BaseServiceHandler(MasterHandler):
new_state = DISCONNECTED_STATE_DICT.get(node.getType(), protocol.DOWN_STATE)
if node.getState() == new_state:
return
if new_state != protocol.BROKEN_STATE and node.getState() == protocol.PENDING_STATE:
if new_state != protocol.BROKEN_STATE and node.isPending():
# was in pending state, so drop it from the node manager to forget
# it and do not set in running state when it comes back
logging.info('drop a pending node from the node manager')
......
......@@ -19,7 +19,7 @@ from neo import logging
from neo import protocol
from neo.master.handlers import MasterHandler
from neo.protocol import RUNNING_STATE, PENDING_STATE
from neo.protocol import RUNNING_STATE
from neo.util import dump
class AdministrationHandler(MasterHandler):
......@@ -100,7 +100,7 @@ class AdministrationHandler(MasterHandler):
uuid_set = set()
# take all pending nodes
for node in nm.getStorageList():
if node.getState() == PENDING_STATE:
if node.isPending():
uuid_set.add(node.getUUID())
# keep only selected nodes
if uuid_list:
......@@ -118,7 +118,7 @@ class AdministrationHandler(MasterHandler):
node = nm.getByUUID(uuid)
new_cells = pt.addNode(node)
cell_list.extend(new_cells)
node.setState(RUNNING_STATE)
node.setRunning()
app.broadcastNodeInformation(node)
# start nodes
for s_conn in em.getConnectionList():
......
......@@ -18,9 +18,6 @@
from neo import logging
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
DOWN_STATE
from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure
......@@ -33,7 +30,7 @@ class ElectionHandler(MasterHandler):
raise protocol.UnexpectedPacketError
app = self.app
for node_type, addr, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE:
if node_type != protocol.MASTER_NODE_TYPE:
# No interest.
continue
......@@ -52,7 +49,7 @@ class ElectionHandler(MasterHandler):
if node.getUUID() is None:
node.setUUID(uuid)
if state in (node.getState(), RUNNING_STATE):
if state in (node.getState(), protocol.RUNNING_STATE):
# No change. Don't care.
continue
......@@ -68,8 +65,8 @@ class ClientElectionHandler(ElectionHandler):
def packetReceived(self, conn, packet):
node = self.app.nm.getByAddress(conn.getAddress())
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
if not node.isBroken():
node.setRunning()
MasterHandler.packetReceived(self, conn, packet)
def connectionStarted(self, conn):
......@@ -96,10 +93,10 @@ class ClientElectionHandler(ElectionHandler):
addr = conn.getAddress()
app.negotiating_master_node_set.discard(addr)
node = app.nm.getByAddress(addr)
if node.getState() == RUNNING_STATE:
if node.isRunning():
app.unconnected_master_node_set.add(addr)
node.setState(TEMPORARILY_DOWN_STATE)
elif node.getState() == TEMPORARILY_DOWN_STATE:
node.setTemporarilyDown()
elif node.isTemporarilyDown():
app.unconnected_master_node_set.add(addr)
MasterHandler.connectionFailed(self, conn)
......@@ -108,7 +105,7 @@ class ClientElectionHandler(ElectionHandler):
addr = conn.getAddress()
node = app.nm.getByAddress(addr)
if node is not None:
node.setState(DOWN_STATE)
node.setDown()
app.negotiating_master_node_set.discard(addr)
MasterHandler.peerBroken(self, conn)
......@@ -117,7 +114,7 @@ class ClientElectionHandler(ElectionHandler):
num_replicas, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
if node_type != protocol.MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', *address)
app.nm.remove(node)
......@@ -200,8 +197,12 @@ class ClientElectionHandler(ElectionHandler):
[primary_server])
# Request a node idenfitication.
conn.ask(protocol.requestNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server, app.name))
conn.ask(protocol.requestNodeIdentification(
protocol.MASTER_NODE_TYPE,
app.uuid,
app.server,
app.name
))
class ServerElectionHandler(ElectionHandler):
......@@ -214,7 +215,7 @@ class ServerElectionHandler(ElectionHandler):
addr = conn.getAddress()
node = app.nm.getByAddress(addr)
if node is not None and node.getUUID() is not None:
node.setState(BROKEN_STATE)
node.setBroken()
MasterHandler.peerBroken(self, conn)
def handleRequestNodeIdentification(self, conn, packet, node_type,
......@@ -229,7 +230,7 @@ class ServerElectionHandler(ElectionHandler):
return
self.checkClusterName(name)
app = self.app
if node_type != MASTER_NODE_TYPE:
if node_type != protocol.MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
raise protocol.NotReadyError
node = app.nm.getByAddress(address)
......@@ -238,7 +239,7 @@ class ServerElectionHandler(ElectionHandler):
raise protocol.ProtocolError('unknown master node')
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
if node.isBroken():
raise protocol.BrokenNodeDisallowedError
# supplied another uuid in case of conflict
......@@ -248,8 +249,14 @@ class ServerElectionHandler(ElectionHandler):
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid,
app.server, app.pt.getPartitions(), app.pt.getReplicas(), uuid)
p = protocol.acceptNodeIdentification(
protocol.MASTER_NODE_TYPE,
app.uuid,
app.server,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid
)
conn.answer(p, packet.getId())
def handleAnnouncePrimaryMaster(self, conn, packet):
......
......@@ -38,23 +38,23 @@ class IdentificationHandler(MasterHandler):
node = node_by_uuid or node_by_addr
if node_by_uuid is not None:
if node.getAddress() == address:
if node.getState() == protocol.BROKEN_STATE:
if node.isBroken():
raise protocol.BrokenNodeDisallowedError
# the node is still alive
node.setState(protocol.RUNNING_STATE)
node.setRunning()
if node.getAddress() != address:
if node.getState() == protocol.RUNNING_STATE:
if node.isRunning():
# still running, reject this new node
raise protocol.ProtocolError('invalid server address')
# this node has changed its address
node.setAddress(address)
node.setState(protocol.RUNNING_STATE)
node.setRunning()
if node_by_uuid is None and node_by_addr is not None:
if node.getState() == protocol.RUNNING_STATE:
if node.isRunning():
# still running, reject this new node
raise protocol.ProtocolError('invalid server address')
node.setAddress(address)
node.setState(protocol.RUNNING_STATE)
node.setRunning()
# ask the app the node identification, if refused, an exception is raised
result = self.app.identifyNode(node_type, uuid, node)
......
......@@ -17,11 +17,9 @@
from neo import logging
from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, DOWN_STATE
from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure, PrimaryFailure
from neo.protocol import UnexpectedPacketError
from neo import protocol
class SecondaryMasterHandler(MasterHandler):
""" Handler used by primary to handle secondary masters"""
......@@ -29,7 +27,7 @@ class SecondaryMasterHandler(MasterHandler):
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
assert node is not None
node.setState(DOWN_STATE)
node.setDown()
self.app.broadcastNodeInformation(node)
def connectionCompleted(self, conn):
......@@ -51,16 +49,16 @@ class PrimaryMasterHandler(MasterHandler):
def packetReceived(self, conn, packet):
if not conn.isServer():
node = self.app.nm.getByAddress(conn.getAddress())
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
if not node.isBroken():
node.setRunning()
MasterHandler.packetReceived(self, conn, packet)
def connectionLost(self, conn, new_state):
self.app.primary_master_node.setState(DOWN_STATE)
self.app.primary_master_node.setDown()
raise PrimaryFailure, 'primary master is dead'
def handleAnnouncePrimaryMaster(self, conn, packet):
raise UnexpectedPacketError
raise protocol.UnexpectedPacketError
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
......@@ -68,7 +66,7 @@ class PrimaryMasterHandler(MasterHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, addr, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE:
if node_type != protocol.MASTER_NODE_TYPE:
# No interest.
continue
......@@ -92,7 +90,7 @@ class PrimaryMasterHandler(MasterHandler):
num_replicas, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node_type == MASTER_NODE_TYPE
assert node_type == protocol.MASTER_NODE_TYPE
assert conn.getAddress() == address
if your_uuid != app.uuid:
......
......@@ -31,7 +31,7 @@ class StorageServiceHandler(BaseServiceHandler):
def connectionCompleted(self, conn):
node = self.app.nm.getByUUID(conn.getUUID())
if node.getState() == protocol.RUNNING_STATE:
if node.isRunning():
conn.notify(protocol.notifyLastOID(self.app.loid))
conn.notify(protocol.startOperation())
......
......@@ -41,8 +41,8 @@ class PartitionTable(neo.pt.PartitionTable):
# start with the first PTID
self.id = pack('!Q', 1)
# First, filter the list of nodes.
node_list = [n for n in node_list \
if n.getState() == RUNNING_STATE and n.getUUID() is not None]
node_list = [n for n in node_list if n.isRunning() \
and n.getUUID() is not None]
if len(node_list) == 0:
# Impossible.
raise RuntimeError, \
......@@ -71,7 +71,7 @@ class PartitionTable(neo.pt.PartitionTable):
for node, count in self.count_dict.iteritems():
if min_count > count \
and node not in excluded_node_list \
and node.getState() == RUNNING_STATE:
and node.isRunning():
min_node = node
min_count = count
return min_node
......
......@@ -126,7 +126,7 @@ class PartitionTable(object):
def setCell(self, offset, node, state):
if state == protocol.DISCARDED_STATE:
return self.removeCell(offset, node)
if node.getState() in (protocol.BROKEN_STATE, protocol.DOWN_STATE):
if node.isBroken() or node.isDown():
return
self.count_dict.setdefault(node, 0)
......
......@@ -20,7 +20,6 @@ import sys
from collections import deque
from neo import protocol
from neo.protocol import HIDDEN_STATE
from neo.node import NodeManager
from neo.event import EventManager
from neo.storage.mysqldb import MySQLDatabaseManager
......@@ -166,7 +165,7 @@ class Application(object):
try:
# check my state
node = self.nm.getByUUID(self.uuid)
if node is not None and node.getState() == HIDDEN_STATE:
if node is not None and node.isHidden():
self.wait()
self.verifyData()
self.initialize()
......@@ -294,7 +293,7 @@ class Application(object):
node = self.nm.getByUUID(self.uuid)
while 1:
self.em.poll(1)
if node.getState() != HIDDEN_STATE:
if not node.isHidden():
break
def queueEvent(self, some_callable, *args, **kwargs):
......
......@@ -19,7 +19,7 @@ from neo import logging
from neo.handler import EventHandler
from neo import protocol
from neo.protocol import RUNNING_STATE, BROKEN_STATE, CLIENT_NODE_TYPE, \
from neo.protocol import BROKEN_STATE, CLIENT_NODE_TYPE, \
DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE
from neo.util import dump
from neo.exception import PrimaryFailure, OperationFailure
......
......@@ -50,7 +50,7 @@ class IdentificationHandler(BaseStorageHandler):
logging.error('reject an unknown node %s', dump(uuid))
raise protocol.NotReadyError
# If this node is broken, reject it.
if node.getState() == protocol.BROKEN_STATE:
if node.isBroken():
raise protocol.BrokenNodeDisallowedError
# apply the handler and set up the connection
handler = handler(self.app)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment