Commit bcb00e9c authored by Denis Bilenko's avatar Denis Bilenko

move alarm() to util.py

parent 252c256f
# test for issue #210 # test for issue #210
from gevent import core from gevent import core
import time from util import alarm
import sys
import os
import threading
class alarm(threading.Thread):
# can't use signal.alarm because of Windows
def __init__(self, timeout):
threading.Thread.__init__(self)
self.setDaemon(True)
self.timeout = timeout
self.start()
def run(self):
time.sleep(self.timeout)
sys.stderr.write('Timeout.\n')
os._exit(5)
alarm(1) alarm(1)
......
from __future__ import with_statement from __future__ import with_statement
import gevent import gevent
import time from util import alarm
import sys
import os
import threading
class alarm(threading.Thread):
# can't use signal.alarm because of Windows
def __init__(self, timeout):
threading.Thread.__init__(self)
self.setDaemon(True)
self.timeout = timeout
self.start()
def run(self):
time.sleep(self.timeout)
sys.stderr.write('Timeout.\n')
os._exit(5)
alarm(1) alarm(1)
......
...@@ -4,8 +4,9 @@ import os ...@@ -4,8 +4,9 @@ import os
import re import re
import traceback import traceback
import unittest import unittest
import threading
import time
from datetime import timedelta from datetime import timedelta
from time import time
from gevent import subprocess, sleep, spawn_later from gevent import subprocess, sleep, spawn_later
...@@ -168,9 +169,9 @@ def run(command, **kwargs): ...@@ -168,9 +169,9 @@ def run(command, **kwargs):
popen = start(command, **kwargs) popen = start(command, **kwargs)
name = popen.name name = popen.name
try: try:
time_start = time() time_start = time.time()
out, err = popen.communicate() out, err = popen.communicate()
took = time() - time_start took = time.time() - time_start
if popen.poll() is None: if popen.poll() is None:
result = 'TIMEOUT' result = 'TIMEOUT'
else: else:
...@@ -388,3 +389,18 @@ class TestServer(unittest.TestCase): ...@@ -388,3 +389,18 @@ class TestServer(unittest.TestCase):
function() function()
ran = True ran = True
assert ran assert ran
class alarm(threading.Thread):
# can't use signal.alarm because of Windows
def __init__(self, timeout):
threading.Thread.__init__(self)
self.setDaemon(True)
self.timeout = timeout
self.start()
def run(self):
time.sleep(self.timeout)
sys.stderr.write('Timeout.\n')
os._exit(5)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment