Commit d44ce5f5 authored by Marco Mariani's avatar Marco Mariani

avoid busy loops, use threadsafe Queue

parent 224c7d98
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging import logging
import Queue
import socket import socket
import thread import thread
import time import time
...@@ -87,12 +88,11 @@ class ResilientInstance(object): ...@@ -87,12 +88,11 @@ class ResilientInstance(object):
self.halter_id = 0 self.halter_id = 0
self.inElection = False self.inElection = False
self.alive = True self.alive = True
self.lastPing = time.clock()
self.mainCanal = self.comm.canal([MSG_PING, MSG_HALT, MSG_VICTORY]) self.mainCanal = self.comm.create_canal([MSG_PING, MSG_HALT, MSG_VICTORY])
self.renamer = renamer self.renamer = renamer
self.okCanal = self.comm.canal([MSG_OK]) self.okCanal = self.comm.create_canal([MSG_OK])
self.confpath = confpath self.confpath = confpath
self.loadConnectionInfo() self.loadConnectionInfo()
...@@ -185,29 +185,24 @@ class ResilientInstance(object): ...@@ -185,29 +185,24 @@ class ResilientInstance(object):
return True return True
class FilteredCanal(object): class FilteredCanal(object):
def __init__(self, accept, timeout): def __init__(self, accept, timeout):
self.accept = accept self.accept = accept
self.list = [] self.queue = Queue.Queue()
self.lock = thread.allocate_lock()
self.timeout = timeout self.timeout = timeout
def append(self, message, sender): def append(self, message, sender):
if message in self.accept: if message in self.accept:
self.lock.acquire() self.queue.put([message, sender])
self.list.append([message, sender])
self.lock.release()
def get(self): def get(self):
start = time.clock() try:
while (time.clock() - start < self.timeout): return self.queue.get(timeout=self.timeout)
self.lock.acquire() except Queue.Empty:
if self.list: return [None, None]
self.lock.release()
return self.list.pop(0)
self.lock.release()
return [None, None]
class Wrapper(object): class Wrapper(object):
...@@ -243,7 +238,7 @@ class Wrapper(object): ...@@ -243,7 +238,7 @@ class Wrapper(object):
finally: finally:
s.close() s.close()
def canal(self, accept): def create_canal(self, accept):
created = FilteredCanal(accept, self.timeout) created = FilteredCanal(accept, self.timeout)
self.canals.append(created) self.canals.append(created)
return created return created
...@@ -286,11 +281,7 @@ def run(args): ...@@ -286,11 +281,7 @@ def run(args):
computer.comm.start() computer.comm.start()
thread.start_new_thread(computer.listen, ()) thread.start_new_thread(computer.listen, ())
thread.start_new_thread(computer.main, ())
thread.start_new_thread(computer.aliveManagement, ()) thread.start_new_thread(computer.aliveManagement, ())
while True: computer.main()
# XXX tight loop
continue
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment