Commit e65ca3b2 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

promise/plugin: add promise to check websockets

Note that this promise is only available in Python3.
parent c68f8faf
......@@ -12,7 +12,7 @@ for f in sorted(glob.glob(os.path.join('slapos', 'README.*.rst'))):
long_description += open("CHANGES.txt").read() + "\n"
prediction_require = ['statsmodels', 'scipy', 'pandas']
test_require = ['mock', 'cryptography',] + prediction_require
test_require = ['mock', 'cryptography', 'websockets; python_version>="3"',] + prediction_require
setup(name=name,
version=version,
......@@ -54,6 +54,7 @@ setup(name=name,
'six',
'cryptography',
'click',
'websocket-client; python_version>="3"',
'ipaddress; python_version<"3"',
),
extras_require = {
......
"""
Some notable parameters:
url:
The URL of the websocket to test
promise-timeout:
Optional timeout (in seconds) for promise.
timeout:
Optional timeout (in seconds) for websocket request.
frequency:
Optional frequency (in minutes) for running this promise.
binary:
Boolean to say if the frames sent to websocket are binary (default) or text, only useful when content* options are set
content-to-send:
Optional bytes array or string (depending on binary) to send to the websocket
content-to-receive:
Optional bytes array or string (depending on binary) to compare the first message sent by websocket with (must be used with content to send)
"""
from zope.interface import implementer
from slapos.grid.promise import interface
from slapos.grid.promise.generic import GenericPromise
import websocket
@implementer(interface.IPromise)
class RunPromise(GenericPromise):
def __init__(self, config):
super(RunPromise, self).__init__(config)
# SR can set custom periodicity
self.setPeriodicity(float(self.getConfig('frequency', 2)))
def sense(self):
"""
Check if websocket URL is available.
"""
url = self.getConfig('url')
# make default time a max of 5 seconds, a bit smaller than promise-timeout
# and in the same time at least 1 second
default_timeout = max(
1, min(5, int(self.getConfig('promise-timeout', 20)) - 1))
binary = self.getConfig('binary', True)
content_to_send = self.getConfig('content-to-send')
content_to_receive = self.getConfig('content-to-receive')
try:
ws = websocket.create_connection(url, timeout=int(self.getConfig('timeout', default_timeout)))
except websocket._exceptions.WebSocketBadStatusException:
self.logger.error(
"ERROR connection not possible while accessing %r", url)
except Exception as e:
self.logger.error("ERROR: %s", e)
else:
if content_to_send and content_to_receive:
if binary:
ws.send_binary(content_to_send)
else:
ws.send(content_to_send)
response = ws.recv()
if response != content_to_receive:
self.logger.error("ERROR received %r instead of %r", response, content_to_receive)
else:
self.logger.info("Correctly received %r from %r", content_to_receive, url)
else:
self.logger.info("Correctly connected to %r", url)
ws.close()
def anomaly(self):
return self._test(result_count=3, failure_amount=3)
##############################################################################
#
# Copyright (c) 2019 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# This module contains python3 syntax that can't be parsed by python2
# that's why it is in a separated module
from slapos.grid.promise import PromiseError
from . import TestPromisePluginMixin
from slapos.util import str2bytes
import asyncio
import contextlib
import os
import time
import websocket
from websockets import serve
import multiprocessing
class CheckWebsocketAvailableMixin(TestPromisePluginMixin):
@classmethod
def setUpClass(cls):
SLAPOS_TEST_IPV4 = os.environ.get('SLAPOS_TEST_IPV4', '127.0.0.1')
SLAPOS_TEST_IPV4_PORT = 57965
cls.WS_ENDPOINT = "ws://%s:%s/" % (SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT)
async def echo(websocket):
path = websocket.path.split('/')[-1]
if '_' in path:
response, timeout = path.split('_')
response = response
timeout = int(timeout)
else:
timeout = 0
response = path
time.sleep(timeout)
async for message in websocket:
if response == "OK":
await websocket.send(message)
else:
await websocket.send("bad")
async def server():
async with serve(echo, SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT):
await asyncio.Future() # run forever
def main():
asyncio.run(server())
cls.server_process = multiprocessing.Process(target=main)
cls.server_process.start()
for _ in range(20):
try:
with contextlib.closing(websocket.create_connection((SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT))):
break
except Exception:
time.sleep(.1)
@classmethod
def tearDownClass(cls):
cls.server_process.terminate()
cls.server_process.join()
def setUp(self):
TestPromisePluginMixin.setUp(self)
self.promise_name = "check-websocket-available.py"
def make_content(self, option_dict):
content = """from slapos.promise.plugin.check_websocket_available import RunPromise
extra_config_dict = {
"""
for option in option_dict.items():
content += "\n '%s': %r," % option
return content + "\n}"
def tearDown(self):
TestPromisePluginMixin.tearDown(self)
##############################################################################
#
# Copyright (c) 2019 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.grid.promise import PromiseError
from slapos.util import str2bytes
import unittest
import sys
import contextlib
import os
import random
import string
import time
import multiprocessing
if sys.version_info[0] >= 3:
import asyncio
import websocket
from websockets import serve
from .check_websocket_available_py3_test import CheckWebsocketAvailableMixin
else:
class CheckWebsocketAvailableMixin():
pass
@unittest.skipIf(sys.version_info[0] < 3, "not supported in this library version")
class TestCheckWebsocketAvailable(CheckWebsocketAvailableMixin):
def test_check_url_bad(self):
content = self.make_content({
'url': 'ws://',
'timeout': 10,
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
"ERROR: hostname is invalid"
)
def test_check_simple_connect(self):
url = self.WS_ENDPOINT + 'OK'
content = self.make_content({
'url': url,
'timeout': 10,
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
("Correctly connected to %r" % url)
)
def test_check_read_text(self):
text = ''.join(random.choice(string.ascii_letters) for i in range(10))
url = self.WS_ENDPOINT + 'OK'
content = self.make_content({
'url': url,
'binary': False,
'content-to-send': text,
'content-to-receive' : text
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
("Correctly received %r from %r" % (text, url))
)
def test_check_read_binary(self):
text = os.urandom(100)
url = self.WS_ENDPOINT + 'OK'
content = self.make_content({
'url': url,
'content-to-send': text,
'content-to-receive' : text
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
("Correctly received %r from %r" % (text, url))
)
def test_check_bad_read(self):
text = ''.join(random.choice(string.ascii_letters) for i in range(10))
url = self.WS_ENDPOINT + 'NOK'
content = self.make_content({
'url': url,
'content-to-send': text,
'content-to-receive' : text
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
("ERROR received 'bad' instead of %r" % text)
)
def test_check_timeout(self):
url = self.WS_ENDPOINT + 'OK_5'
content = self.make_content({
'url': url,
'timeout': 1,
# use content to send/receceive so that promise will try to read from websocket
# otherwise, we can't test the timeout
'content-to-send': "a",
'content-to-receive' : "a",
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
"Error: Promise timed out after 0.5 seconds",
)
if __name__ == '__main__':
unittest.main()
  • @tomo it seems this break the SlapOS.Eggs.UnitTest-Master.Python3 tests with such error:

    2023-03-02 17:08:56,160 INFO     slapgrid_sr: 2023-03-02 17:08:56 slapos[11832] INFO   Getting distribution for 'websockets'.
    2023-03-02 17:08:56,160 INFO     slapgrid_sr: 2023-03-02 17:08:56 slapos[11832] INFO Error: Picked: websockets = 10.4
  • I'm waiting for the test to pass for slapos!1367 (merged). As soon as it passes, I'm merging and this will fix this error.

Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment