Commit 36879f32 authored by Aurel's avatar Aurel

handle HIDDEN STATE and ask for shutdown


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@533 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 0a9f2548
...@@ -16,14 +16,14 @@ ...@@ -16,14 +16,14 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging import logging
import os import os, sys
from time import time from time import time
from struct import unpack, pack from struct import unpack, pack
from collections import deque from collections import deque
from neo.config import ConfigurationManager from neo.config import ConfigurationManager
from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_PTID, partition_cell_states INVALID_UUID, INVALID_PTID, partition_cell_states, HIDDEN_STATE
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager from neo.event import EventManager
from neo.storage.mysqldb import MySQLDatabaseManager from neo.storage.mysqldb import MySQLDatabaseManager
...@@ -32,6 +32,7 @@ from neo.exception import OperationFailure, PrimaryFailure ...@@ -32,6 +32,7 @@ from neo.exception import OperationFailure, PrimaryFailure
from neo.storage.bootstrap import BootstrapEventHandler from neo.storage.bootstrap import BootstrapEventHandler
from neo.storage.verification import VerificationEventHandler from neo.storage.verification import VerificationEventHandler
from neo.storage.operation import OperationEventHandler from neo.storage.operation import OperationEventHandler
from neo.storage.hidden import HiddenEventHandler
from neo.storage.replicator import Replicator from neo.storage.replicator import Replicator
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
from neo.pt import PartitionTable from neo.pt import PartitionTable
...@@ -141,11 +142,16 @@ class Application(object): ...@@ -141,11 +142,16 @@ class Application(object):
try: try:
while 1: while 1:
try: try:
# check my state
node = self.nm.getNodeByUUID(self.uuid)
if node is not None and node.getState() == HIDDEN_STATE:
self.wait()
self.verifyData() self.verifyData()
self.doOperation() self.doOperation()
except OperationFailure: except OperationFailure:
logging.error('operation stopped') logging.error('operation stopped')
self.operational = False self.operational = False
except PrimaryFailure: except PrimaryFailure:
logging.error('primary master is down') logging.error('primary master is down')
...@@ -270,6 +276,19 @@ class Application(object): ...@@ -270,6 +276,19 @@ class Application(object):
if self.replicator.pending(): if self.replicator.pending():
self.replicator.act() self.replicator.act()
def wait(self):
# change handler
handler = HiddenEventHandler(self)
for conn in self.em.getConnectionList():
conn.setHandler(handler)
node = self.nm.getNodeByUUID(self.uuid)
while 1:
self.em.poll(1)
if node.getState() != HIDDEN_STATE:
return
def queueEvent(self, callable, *args, **kwargs): def queueEvent(self, callable, *args, **kwargs):
self.event_queue.append((callable, args, kwargs)) self.event_queue.append((callable, args, kwargs))
...@@ -282,3 +301,11 @@ class Application(object): ...@@ -282,3 +301,11 @@ class Application(object):
def getPartition(self, oid_or_tid): def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.num_partitions return unpack('!Q', oid_or_tid)[0] % self.num_partitions
def shutdown(self):
"""Close all connections and exit"""
for c in self.em.getConnectionList():
if not c.isListeningConnection():
c.close()
sys.exit("Application has been asked to shut down")
...@@ -20,11 +20,13 @@ import logging ...@@ -20,11 +20,13 @@ import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import Packet, UnexpectedPacketError, \ from neo.protocol import Packet, UnexpectedPacketError, \
INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import PrimaryFailure from neo.exception import PrimaryFailure, OperationFailure
from neo.handler import identification_required, restrict_node_types from neo.handler import identification_required, restrict_node_types
class StorageEventHandler(EventHandler): class StorageEventHandler(EventHandler):
...@@ -132,13 +134,26 @@ class StorageEventHandler(EventHandler): ...@@ -132,13 +134,26 @@ class StorageEventHandler(EventHandler):
# No interest. # No interest.
continue continue
if uuid == self.app.uuid:
# This is me, do what the master tell me
logging.info("I was told I'm %s" %(state))
if state in (DOWN_STATE, TEMPORARILY_DOWN_STATE, BROKEN_STATE):
conn.close()
self.app.shutdown()
elif state == HIDDEN_STATE:
n = app.nm.getNodeByUUID(uuid)
n.setState(state)
raise OperationFailure
else:
# I know I'm running
continue
n = app.nm.getNodeByUUID(uuid) n = app.nm.getNodeByUUID(uuid)
if n is None: if n is None:
n = StorageNode(server = addr, uuid = uuid) n = StorageNode(server = addr, uuid = uuid)
app.nm.add(n) app.nm.add(n)
else: else:
n.setServer(addr) n.setServer(addr)
n.setState(state) n.setState(state)
elif node_type == CLIENT_NODE_TYPE: elif node_type == CLIENT_NODE_TYPE:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment