############################################################################## # # Copyright (c) 2019 Nexedi SA and Contributors. All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## from __future__ import unicode_literals import io import logging import os import tempfile import textwrap import time import psutil import requests from six.moves import configparser from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass( os.path.abspath( os.path.join(os.path.dirname(__file__), '..', 'software.cfg'))) class GrafanaTestCase(SlapOSInstanceTestCase): """Base test case for grafana. Since the instances takes time to start and stop, we increase the number of retries. """ instance_max_retry = 50 report_max_retry = 30 class TestGrafana(GrafanaTestCase): def setUp(self): self.grafana_url = self.computer_partition.getConnectionParameterDict( )['grafana-url'] def test_grafana_available(self): resp = requests.get(self.grafana_url, verify=False) self.assertEqual(requests.codes.ok, resp.status_code) def test_grafana_api(self): # check API is usable api_org_url = '{self.grafana_url}/api/org'.format(**locals()) resp = requests.get(api_org_url, verify=False) self.assertEqual(requests.codes.unauthorized, resp.status_code) connection_params = self.computer_partition.getConnectionParameterDict() resp = requests.get( api_org_url, verify=False, auth=requests.auth.HTTPBasicAuth( connection_params['grafana-username'], connection_params['grafana-password'], )) self.assertEqual(requests.codes.ok, resp.status_code) self.assertEqual(1, resp.json()['id']) def test_grafana_datasource_povisinonned(self): # data sources are provisionned connection_params = self.computer_partition.getConnectionParameterDict() resp = requests.get( '{self.grafana_url}/api/datasources'.format(**locals()), verify=False, auth=requests.auth.HTTPBasicAuth( connection_params['grafana-username'], connection_params['grafana-password'], )) self.assertEqual(requests.codes.ok, resp.status_code) self.assertEqual( sorted(['influxdb', 'loki']), sorted([ds['type'] for ds in resp.json()])) def test_email_disabled(self): config = configparser.ConfigParser() # grafana config file is like an ini file with an implicit default section with open( os.path.join(self.computer_partition_root_path, 'etc', 'grafana-config-file.cfg')) as f: config.readfp(io.StringIO('[default]\n' + f.read())) self.assertEqual(config.get('smtp', 'enabled'), 'false') class TestGrafanaEmailEnabled(GrafanaTestCase): __partition_reference__ = 'mail' smtp_verify_ssl = "true" smtp_skip_verify = "false" @classmethod def getInstanceParameterDict(cls): return { "smtp-server": "smtp.example.com:25", "smtp-username": "smtp_username", "smtp-password": "smtp_password", 'smtp-verify-ssl': cls.smtp_verify_ssl, "email-from-address": "grafana@example.com", "email-from-name": "Grafana From Name", } def test_email_enabled(self): config = configparser.ConfigParser() with open( os.path.join(self.computer_partition_root_path, 'etc', 'grafana-config-file.cfg')) as f: config.readfp(io.StringIO('[default]\n' + f.read())) self.assertEqual(config.get('smtp', 'enabled'), 'true') self.assertEqual(config.get('smtp', 'host'), 'smtp.example.com:25') self.assertEqual(config.get('smtp', 'user'), 'smtp_username') self.assertEqual(config.get('smtp', 'password'), '"""smtp_password"""') self.assertEqual(config.get('smtp', 'skip_verify'), self.smtp_skip_verify) self.assertEqual(config.get('smtp', 'from_address'), 'grafana@example.com') self.assertEqual(config.get('smtp', 'from_name'), 'Grafana From Name') class TestGrafanaEmailEnabledSkipVerify(TestGrafanaEmailEnabled): smtp_verify_ssl = "false" smtp_skip_verify = "true" class TestInfluxDb(GrafanaTestCase): def setUp(self): self.influxdb_url = self.computer_partition.getConnectionParameterDict( )['influxdb-url'] def test_influxdb_available(self): ping_url = '{self.influxdb_url}/ping'.format(**locals()) resp = requests.get(ping_url, verify=False) self.assertEqual(requests.codes.no_content, resp.status_code) def test_influxdb_api(self): query_url = '{self.influxdb_url}/query'.format(**locals()) connection_params = self.computer_partition.getConnectionParameterDict() for i in range(10): # retry, as it may take a little delay to create databases resp = requests.get( query_url, verify=False, params=dict( q='SHOW DATABASES', u=connection_params['influxdb-username'], p=connection_params['influxdb-password'])) self.assertEqual(requests.codes.ok, resp.status_code) result, = resp.json()['results'] if result['series'] and 'values' in result['series'][0]: break time.sleep(0.5 * i) self.assertIn( [connection_params['influxdb-database']], result['series'][0]['values']) class TestTelegraf(GrafanaTestCase): def test_telegraf_running(self): with self.slap.instance_supervisor_rpc as supervisor: all_process_info = supervisor.getAllProcessInfo() process_info, = [p for p in all_process_info if 'telegraf' in p['name']] self.assertEqual('RUNNING', process_info['statename']) class TestLoki(GrafanaTestCase): @classmethod def getInstanceParameterDict(cls): cls._logfile = tempfile.NamedTemporaryFile(suffix='log') return { 'promtail-extra-scrape-config': textwrap.dedent( r''' - job_name: {cls.__name__} pipeline_stages: - match: selector: '{{job="{cls.__name__}"}}' stages: - multiline: firstline: '^\d{{4}}-\d{{2}}-\d{{2}}\s\d{{1,2}}\:\d{{2}}\:\d{{2}}\,\d{{3}}' max_wait_time: 3s - regex: expression: '^(?P<timestamp>.*) - (?P<name>\S+) - (?P<level>\S+) - (?P<message>.*)' - timestamp: format: 2006-01-02T15:04:05Z00:00 source: timestamp - labels: level: name: static_configs: - targets: - localhost labels: job: {cls.__name__} __path__: {cls._logfile.name} ''').format(**locals()) } @classmethod def tearDownClass(cls): cls._logfile.close() super(TestLoki, cls).tearDownClass() def setUp(self): self.loki_url = self.computer_partition.getConnectionParameterDict( )['loki-url'] def test_loki_available(self): self.assertEqual( requests.codes.ok, requests.get('{self.loki_url}/ready'.format(**locals()), verify=False).status_code) def test_log_ingested(self): # create a logger logging to the file that we have # configured in instance parameter. test_logger = logging.getLogger(self.id()) test_logger.propagate = False test_logger.setLevel(logging.INFO) test_handler = logging.FileHandler(filename=self._logfile.name) test_handler.setFormatter( logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s')) test_logger.addHandler(test_handler) test_logger.info("testing message") test_logger.info("testing another message") test_logger.warning("testing warn") # log an exception, which will be multi line in log file. def nested1(): def nested2(): raise ValueError('boom') nested2() try: nested1() except ValueError: test_logger.exception("testing exception") # Check our messages have been ingested # we retry a few times, because there's a short delay until messages are # ingested and returned. for i in range(60): resp = requests.get( '{self.loki_url}/api/prom/query?query={{job="TestLoki"}}'.format( **locals()), verify=False).json() if len(resp.get('streams', [])) < 3: time.sleep(0.5 * i) continue warn_stream_list = [stream for stream in resp['streams'] if 'level="WARNING"' in stream['labels']] self.assertEqual(1, len(warn_stream_list), resp['streams']) warn_stream, = warn_stream_list self.assertIn("testing warn", warn_stream['entries'][0]['line']) info_stream_list = [stream for stream in resp['streams'] if 'level="INFO"' in stream['labels']] self.assertEqual(1, len(info_stream_list), resp['streams']) info_stream, = info_stream_list self.assertTrue( [ line for line in info_stream['entries'] if "testing message" in line['line'] ]) self.assertTrue( [ line for line in info_stream['entries'] if "testing another message" in line['line'] ]) error_stream_list = [stream for stream in resp['streams'] if 'level="ERROR"' in stream['labels']] self.assertEqual(1, len(error_stream_list), resp['streams']) error_stream, = error_stream_list line, = [line['line'] for line in error_stream['entries']] # this entry is multi-line self.assertIn('testing exception\nTraceback (most recent call last):\n', line) self.assertIn('ValueError: boom', line) # The labels we have configued are also available resp = requests.get( '{self.loki_url}/api/prom/label'.format(**locals()), verify=False).json() self.assertIn('level', resp['values']) self.assertIn('name', resp['values']) class TestListenInPartition(GrafanaTestCase): def setUp(self): with self.slap.instance_supervisor_rpc as supervisor: all_process_info = supervisor.getAllProcessInfo() self.process_dict = { p['name'].replace('-on-watch', ''): psutil.Process(p['pid']) for p in all_process_info if p['name'] != 'watchdog' } def test_grafana_listen(self): self.assertEqual( [ c.laddr for c in self.process_dict['grafana'].connections() if c.status == 'LISTEN' ], [(self._ipv6_address, 8180)], ) def test_influxdb_listen(self): self.assertEqual( sorted([ c.laddr for c in self.process_dict['influxdb'].connections() if c.status == 'LISTEN' ]), [ (self._ipv4_address, 8088), (self._ipv6_address, 8086), ], ) def test_telegraph_listen(self): self.assertEqual( [ c.laddr for c in self.process_dict['telegraf'].connections() if c.status == 'LISTEN' ], [], ) def test_loki_listen(self): self.assertEqual( sorted([ c.laddr for c in self.process_dict['loki'].connections() if c.status == 'LISTEN' ]), [ (self._ipv4_address, 3100), (self._ipv4_address, 9095), ], ) def test_promtail_listen(self): self.assertEqual( sorted([ c.laddr for c in self.process_dict['promtail'].connections() if c.status == 'LISTEN' ]), [ (self._ipv4_address, 19080), (self._ipv4_address, 19095), ], )