Commit f30a552f authored by Gabriel Monnerat's avatar Gabriel Monnerat

- refactor code to check via psutil library if process is using the port.

- clean up the tests


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@42036 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 4871d6df
......@@ -30,7 +30,7 @@ from zope.interface import implements
from cloudooo.interfaces.application import IApplication
from cloudooo.utils.utils import logger
from cloudooo.handler.ooo.utils.utils import socketStatus, waitStopDaemon
from psutil import pid_exists, Process
from psutil import pid_exists, Process, AccessDenied
class Application(object):
......@@ -80,7 +80,20 @@ class Application(object):
def status(self):
"""Check by socket if the openoffice work."""
return socketStatus(self.hostname, self.port)
pid = self.pid()
if pid is None:
return False
process = Process(pid)
try:
process.exe
except AccessDenied:
return False
for connection in process.get_connections():
if connection.status == 'LISTEN' and connection.local_address[1] == self.port:
return True
return False
def getAddress(self):
"""Return port and hostname of OOo Instance."""
......@@ -88,6 +101,6 @@ class Application(object):
def pid(self):
"""Returns the pid"""
if not hasattr(self, 'process') or not self.status():
if not hasattr(self, 'process'):
return None
return self.process.pid
......@@ -110,14 +110,14 @@ class OpenOffice(Application):
for process in psutil.process_iter():
try:
if process.exe == join(self.office_binary_path, self._bin_soffice):
connection_list = process.get_connections()
if len(connection_list) > 0 and \
connection_list[0].local_address[1] == self.port:
process.terminate()
for connection in process.get_connections():
if connection.status == "LISTEN" and \
connection.local_address[1] == self.port:
process.terminate()
except psutil.error.AccessDenied, e:
logger.debug(e)
except TypeError, e:
# exception to prevent one psutil issue with svn processes
# exception to prevent one psutil issue with zombie processes
logger.debug(e)
except NotImplementedError, e:
logger.error("lsof isn't installed on this machine: " + str(e))
......
......@@ -57,8 +57,8 @@ class TestOpenOffice(cloudoooTestCase):
"""Test pid function to validate if the return is correctly"""
self.assertNotEquals(self.openoffice.pid(), None)
self.openoffice.stop()
waitStopDaemon(self.openoffice)
self.assertEquals(self.openoffice.pid(), None)
self.assertNotEquals(self.openoffice.pid(), None)
self.assertEquals(self.openoffice.status(), False)
def testOpenOfficeStart(self):
"""Test if the start method works correclty"""
......@@ -93,17 +93,15 @@ class TestOpenOffice(cloudoooTestCase):
openoffice will terminate the first"""
second_openoffice = OpenOffice()
second_openoffice.loadSettings("localhost", 4090,
self.working_path + "_",
self.working_path,
self.virtual_display_id,
self.office_binary_path,
self.uno_path,
'en')
try:
second_openoffice.start()
openoffice_process = Process(self.openoffice.pid())
self.assertRaises(AccessDenied, openoffice_process.get_connections)
finally:
second_openoffice.stop()
second_openoffice.start()
self.assertEquals(self.openoffice.status(), False)
self.assertEquals(second_openoffice.status(), True)
second_openoffice.stop()
self.openoffice.start()
second_openoffice = OpenOffice()
......@@ -113,46 +111,9 @@ class TestOpenOffice(cloudoooTestCase):
self.office_binary_path,
self.uno_path,
'en')
try:
second_openoffice.start()
openoffice_process = Process(self.openoffice.pid())
connection = openoffice_process.get_connections()[0]
self.assertEquals(connection.local_address[1], 4090)
openoffice_process = Process(second_openoffice.pid())
connection = openoffice_process.get_connections()[0]
self.assertEquals(connection.local_address[1], 4091)
finally:
second_openoffice.stop()
if not self.openoffice.status():
self.openoffice.start()
second_openoffice = OpenOffice()
second_openoffice.loadSettings("localhost", 40900,
self.working_path + "_",
self.virtual_display_id,
self.office_binary_path,
self.uno_path,
'en')
second_openoffice.start()
third_openoffice = OpenOffice()
third_openoffice.loadSettings("localhost", 40900,
self.working_path + "_",
self.virtual_display_id,
self.office_binary_path,
self.uno_path,
'en')
try:
third_openoffice.start()
openoffice_process = Process(self.openoffice.pid())
connection = openoffice_process.get_connections()[0]
self.assertEquals(connection.local_address[1], 4090)
openoffice_process = Process(second_openoffice.pid())
self.assertRaises(AccessDenied, openoffice_process.get_connections)
finally:
second_openoffice.stop()
third_openoffice.stop()
self.assertEquals(self.openoffice.status(), True)
self.assertEquals(second_openoffice.status(), True)
def test_suite():
......
......@@ -46,13 +46,12 @@ class TestXvfb(cloudoooTestCase):
def testPid(self):
"""Test pid function to validate if the return is correctly"""
self.assertEquals(self.xvfb.pid(), None)
try:
self.xvfb.start()
self.assertNotEquals(self.xvfb.pid(), None)
finally:
self.xvfb.stop()
waitStopDaemon(self.xvfb)
self.assertEquals(self.xvfb.pid(), None)
self.xvfb.start()
self.assertNotEquals(self.xvfb.pid(), None)
self.assertEquals(self.xvfb.status(), True)
self.xvfb.stop()
self.assertNotEquals(self.xvfb.pid(), None)
self.assertEquals(self.xvfb.status(), False)
def testStatus(self):
"""Test if xvfb is started and stopped correctly"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment