Commit 1f6719db authored by Grégory Wisniewski's avatar Grégory Wisniewski

In case of storage failure without current connection to the primary master,

don't try to send the notification from the polling thread.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@602 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent abaec8a0
...@@ -228,14 +228,11 @@ class Application(object): ...@@ -228,14 +228,11 @@ class Application(object):
self.pt = None self.pt = None
self.primary_master_node = None self.primary_master_node = None
self.master_node_list = master_nodes.split(' ') self.master_node_list = master_nodes.split(' ')
self.master_conn = None
# no self-assigned UUID, primary master will supply us one # no self-assigned UUID, primary master will supply us one
self.uuid = INVALID_UUID self.uuid = INVALID_UUID
self.mq_cache = MQ() self.mq_cache = MQ()
self.new_oid_list = [] self.new_oid_list = []
self.ptid = INVALID_PTID self.ptid = INVALID_PTID
self.num_replicas = 0
self.num_partitions = 0
self.storage_handler = StorageAnswersHandler(self, self.dispatcher) self.storage_handler = StorageAnswersHandler(self, self.dispatcher)
self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher) self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher)
self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher) self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher)
...@@ -268,24 +265,45 @@ class Application(object): ...@@ -268,24 +265,45 @@ class Application(object):
lock = Lock() lock = Lock()
self._pt_acquire = lock.acquire self._pt_acquire = lock.acquire
self._pt_release = lock.release self._pt_release = lock.release
self.master_conn = None
self.num_replicas = None
self.num_partitions = None
self.master_conn = self._getMasterConnection() self.master_conn = self._getMasterConnection()
assert self.master_conn is not None
assert self.num_partitions is not None
assert self.num_replicas is not None
def _notifyDeadStorage(self, s_node):
""" Notify a storage failure to the primary master """
if s_node is None:
return
s_uuid = s_node.getUUID()
ip_address, port = s_node.getServer()
m_conn = self._getMasterConnection()
m_conn.lock()
try:
node_list = [(STORAGE_NODE_TYPE, ip_address, port, s_uuid, state)]
m_conn.notify(protocol.notifyNodeInformation(node_list))
finally:
m_conn.unlock()
def _waitMessage(self, target_conn = None, msg_id = None, handler=None): def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
"""Wait for a message returned by the dispatcher in queues.""" """Wait for a message returned by the dispatcher in queues."""
local_queue = self.local_var.queue local_queue = self.local_var.queue
if handler is None: if handler is None:
handler = self.notifications_handler handler = self.notifications_handler
while 1: while 1:
try: if msg_id is None:
if msg_id is None: try:
conn, packet = local_queue.get_nowait() conn, packet = local_queue.get_nowait()
else: except Empty:
conn, packet = local_queue.get() break
except Empty: else:
break conn, packet = local_queue.get()
# check fake packet
if packet is None: if packet is None:
s_node = self.nm.getNodeByServer(conn.getAddress())
self._notifyDeadStorage(s_node)
if conn.getUUID() == target_conn.getUUID(): if conn.getUUID() == target_conn.getUUID():
raise NEOStorageConnectionFailure('connection closed') raise NEOStorageConnectionFailure('connection closed')
else: else:
......
...@@ -359,20 +359,11 @@ class StorageBaseHandler(BaseHandler): ...@@ -359,20 +359,11 @@ class StorageBaseHandler(BaseHandler):
if id(conn) == key[0]: if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key) queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue) queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set: for queue in queue_set:
queue.put((conn, None)) queue.put((conn, None))
# Notify the primary master node of the failure.
conn = app.master_conn
if conn is not None:
conn.lock()
try:
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port,
node.getUUID(), state)]
conn.notify(protocol.notifyNodeInformation(node_list))
finally:
conn.unlock()
def connectionClosed(self, conn): def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress()) node = self.app.nm.getNodeByServer(conn.getAddress())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment