test.py 15.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

import cgi
import json
import multiprocessing
import os
import tempfile
import unittest
import urlparse
35 36
import base64
import hashlib
37
import logging
38
import contextlib
39 40
from BaseHTTPServer import BaseHTTPRequestHandler

41 42 43 44 45 46 47 48
from io import BytesIO

import paramiko
import requests
from PIL import Image
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
49
from selenium.webdriver.remote.remote_connection import RemoteConnection
50 51
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
52
import urllib3
53

54
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
55
from slapos.testing.utils import findFreeTCPPort, ImageComparisonTestCase, ManagedHTTPServer
56

57 58 59
setUpModule, SeleniumServerTestCase = makeModuleSetUpAndTestCaseClass(
    os.path.abspath(
        os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))
60 61


62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112

class WebServer(ManagedHTTPServer):
  class RequestHandler(BaseHTTPRequestHandler):
    """Request handler for our test server.

    The implemented server is:
      - submit q and you'll get a page with q as title
      - upload a file and the file content will be displayed in div.uploadedfile
    """
    def do_GET(self):
      self.send_response(200)
      self.end_headers()
      self.wfile.write(
          '''
        <html>
          <title>Test page</title>
          <body>
            <style> p { font-family: Arial; } </style>
            <form action="/" method="POST" enctype="multipart/form-data">
              <input name="q" type="text"></input>
              <input name="f" type="file" ></input>
              <input type="submit" value="I'm feeling lucky"></input>
            </form>
            <p>the quick brown fox jumps over the lazy dog</p>
          </body>
        </html>''')

    def do_POST(self):
      form = cgi.FieldStorage(
          fp=self.rfile,
          headers=self.headers,
          environ={
              'REQUEST_METHOD': 'POST',
              'CONTENT_TYPE': self.headers['Content-Type'],
          })
      self.send_response(200)
      self.end_headers()
      file_data = 'no file'
      if form.has_key('f'):
        file_data = form['f'].file.read()
      self.wfile.write(
          '''
        <html>
          <title>%s</title>
          <div>%s</div>
        </html>
      ''' % (form['q'].value, file_data))

    log_message = logging.getLogger(__name__ + '.WebServer').info


113 114 115 116
class WebServerMixin(object):
  """Mixin class which provides a simple web server reachable at self.server_url
  """
  def setUp(self):
117
    self.server_url = self.getManagedResource('web_server', WebServer).url
118 119 120 121 122 123 124 125 126 127 128


class BrowserCompatibilityMixin(WebServerMixin):
  """Mixin class to run validation tests on a specific browser
  """
  desired_capabilities = NotImplemented
  user_agent = NotImplemented

  def setUp(self):
    super(BrowserCompatibilityMixin, self).setUp()
    self.driver = webdriver.Remote(
129 130 131
        command_executor=self.computer_partition.getConnectionParameterDict()
        ['backend-url'],
        desired_capabilities=self.desired_capabilities)
132 133 134 135 136 137 138 139 140 141 142 143

  def tearDown(self):
    self.driver.quit()
    super(BrowserCompatibilityMixin, self).tearDown()

  def test_user_agent(self):
    self.assertIn(
        self.user_agent,
        self.driver.execute_script('return navigator.userAgent'))

  def test_simple_submit_scenario(self):
    self.driver.get(self.server_url)
144
    input_element = WebDriverWait(self.driver, 3).until(
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
        EC.visibility_of_element_located((By.NAME, 'q')))
    input_element.send_keys(self.id())
    input_element.submit()
    WebDriverWait(self.driver, 3).until(EC.title_is(self.id()))

  def test_upload_file(self):
    f = tempfile.NamedTemporaryFile(delete=False)
    f.write(self.id())
    f.close()
    self.addCleanup(lambda: os.remove(f.name))

    self.driver.get(self.server_url)
    self.driver.find_element_by_xpath('//input[@name="f"]').send_keys(f.name)
    self.driver.find_element_by_xpath('//input[@type="submit"]').click()

160
    self.assertEqual(self.id(), self.driver.find_element_by_xpath('//div').text)
161 162 163 164

  def test_screenshot(self):
    self.driver.get(self.server_url)
    screenshot = Image.open(BytesIO(self.driver.get_screenshot_as_png()))
165 166 167 168 169 170 171 172 173 174 175 176
    reference_filename = os.path.join(
        os.path.dirname(__file__), "data",
        self.id() + ".png")

    # save the screenshot somewhere in a path that will be in snapshot folder.
    # XXX we could use a better folder name ...
    screenshot.save(
        os.path.join(self.slap.instance_directory, 'etc',
                     self.id() + ".png"))

    reference = Image.open(reference_filename)
    self.assertImagesSame(screenshot, reference)
177 178

  def test_window_and_screen_size(self):
179 180 181
    size = json.loads(
        self.driver.execute_script(
            '''
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
      return JSON.stringify({
        'screen.width': window.screen.width,
        'screen.height': window.screen.height,
        'screen.pixelDepth': window.screen.pixelDepth,
        'innerWidth': window.innerWidth,
        'innerHeight': window.innerHeight
      })'''))
    # Xvfb is configured like this
    self.assertEqual(1024, size['screen.width'])
    self.assertEqual(768, size['screen.height'])
    self.assertEqual(24, size['screen.pixelDepth'])

    # window size must not be 0 (wrong firefox integration report this)
    self.assertGreater(size['innerWidth'], 0)
    self.assertGreater(size['innerHeight'], 0)

  def test_resize_window(self):
    self.driver.set_window_size(800, 900)
200 201 202
    size = json.loads(
        self.driver.execute_script(
            '''
203 204 205 206 207 208 209 210 211 212 213 214
      return JSON.stringify({
        'outerWidth': window.outerWidth,
        'outerHeight': window.outerHeight
        })'''))
    self.assertEqual(800, size['outerWidth'])
    self.assertEqual(900, size['outerHeight'])

  def test_multiple_clients(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    queue = multiprocessing.Queue()
215

216 217
    def _test(q, server_url):
      driver = webdriver.Remote(
218 219
          command_executor=webdriver_url,
          desired_capabilities=self.desired_capabilities)
220 221 222 223 224 225 226 227
      try:
        driver.get(server_url)
        q.put(driver.title == 'Test page')
      finally:
        driver.quit()

    nb_workers = 10
    workers = []
228
    for _ in range(nb_workers):
229
      worker = multiprocessing.Process(
230
          target=_test, args=(queue, self.server_url))
231 232 233

      worker.start()
      workers.append(worker)
234
    del worker  # pylint
235 236 237 238 239 240
    _ = [worker.join(timeout=30) for worker in workers]

    # terminate workers if they are still alive after 30 seconds
    _ = [worker.terminate() for worker in workers if worker.is_alive()]
    _ = [worker.join() for worker in workers]

241
    del _  # pylint
242
    self.assertEqual(
243
        [True] * nb_workers, [queue.get() for _ in range(nb_workers)])
244 245 246 247 248 249 250 251 252 253


class TestBrowserSelection(WebServerMixin, SeleniumServerTestCase):
  """Test browser can be selected by `desiredCapabilities``
  """
  def test_chrome(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    driver = webdriver.Remote(
254 255
        command_executor=webdriver_url,
        desired_capabilities=DesiredCapabilities.CHROME)
256 257 258 259

    driver.get(self.server_url)
    self.assertEqual('Test page', driver.title)

260
    self.assertIn('Chrome', driver.execute_script('return navigator.userAgent'))
261
    self.assertNotIn(
262
        'Firefox', driver.execute_script('return navigator.userAgent'))
263 264 265 266 267 268 269
    driver.quit()

  def test_firefox(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    driver = webdriver.Remote(
270 271
        command_executor=webdriver_url,
        desired_capabilities=DesiredCapabilities.FIREFOX)
272 273 274 275 276

    driver.get(self.server_url)
    self.assertEqual('Test page', driver.title)

    self.assertIn(
277
        'Firefox', driver.execute_script('return navigator.userAgent'))
278 279 280 281 282 283 284 285 286
    driver.quit()

  def test_firefox_desired_version(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    desired_capabilities = DesiredCapabilities.FIREFOX.copy()
    desired_capabilities['version'] = '60.0.2esr'
    driver = webdriver.Remote(
287 288
        command_executor=webdriver_url,
        desired_capabilities=desired_capabilities)
289 290 291 292 293 294
    self.assertIn(
        'Gecko/20100101 Firefox/60.0',
        driver.execute_script('return navigator.userAgent'))
    driver.quit()
    desired_capabilities['version'] = '52.9.0esr'
    driver = webdriver.Remote(
295 296
        command_executor=webdriver_url,
        desired_capabilities=desired_capabilities)
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
    self.assertIn(
        'Gecko/20100101 Firefox/52.0',
        driver.execute_script('return navigator.userAgent'))
    driver.quit()


class TestFrontend(WebServerMixin, SeleniumServerTestCase):
  """Test hub's https frontend.
  """
  def test_admin(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    admin_url = parameter_dict['admin-url']

    parsed = urlparse.urlparse(admin_url)
    self.assertEqual('admin', parsed.username)
    self.assertTrue(parsed.password)

314
    self.assertIn('Grid Console', requests.get(admin_url, verify=False).text)
315 316 317 318 319 320 321 322

  def test_browser_use_hub(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['url']
    parsed = urlparse.urlparse(webdriver_url)
    self.assertEqual('selenium', parsed.username)
    self.assertTrue(parsed.password)

323 324 325 326 327
    # XXX we are using a self signed certificate, but selenium 3.141.0 does
    # not expose API to ignore certificate verification
    executor = RemoteConnection(webdriver_url, keep_alive=True)
    executor._conn = urllib3.PoolManager(cert_reqs='CERT_NONE', ca_certs=None)

328
    driver = webdriver.Remote(
329
        command_executor=executor,
330
        desired_capabilities=DesiredCapabilities.CHROME)
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349

    driver.get(self.server_url)
    self.assertEqual('Test page', driver.title)
    driver.quit()


class TestSSHServer(SeleniumServerTestCase):
  @classmethod
  def getInstanceParameterDict(cls):
    cls.ssh_key = paramiko.RSAKey.generate(1024)
    return {'ssh-authorized-key': 'ssh-rsa {}'.format(cls.ssh_key.get_base64())}

  def test_connect(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    ssh_url = parameter_dict['ssh-url']
    parsed = urlparse.urlparse(ssh_url)
    self.assertEqual('ssh', parsed.scheme)

    client = paramiko.SSHClient()
350

351 352 353 354 355
    class TestKeyPolicy(object):
      """Accept server key and keep it in self.key for inspection
      """
      def missing_host_key(self, client, hostname, key):
        self.key = key
356

357 358 359 360 361
    key_policy = TestKeyPolicy()
    client.set_missing_host_key_policy(key_policy)

    with contextlib.closing(client):
      client.connect(
362 363 364 365
          username=urlparse.urlparse(ssh_url).username,
          hostname=urlparse.urlparse(ssh_url).hostname,
          port=urlparse.urlparse(ssh_url).port,
          pkey=self.ssh_key,
366 367 368 369 370 371
      )

      # Check fingerprint from server matches the published one.
      # The publish format is the raw output of ssh-keygen and is something like this:
      #   521 SHA256:9aZruv3LmFizzueIFdkd78eGtzghDoPSCBXFkkrHqXE user@hostname (ECDSA)
      # we only want to parse SHA256:9aZruv3LmFizzueIFdkd78eGtzghDoPSCBXFkkrHqXE
372 373
      _, fingerprint_string, _, key_type = parameter_dict[
          'ssh-fingerprint'].split()
374 375 376 377 378 379 380
      self.assertEqual(key_type, '(ECDSA)')

      fingerprint_algorithm, fingerprint = fingerprint_string.split(':', 1)
      self.assertEqual(fingerprint_algorithm, 'SHA256')
      # Paramiko does not allow to get the fingerprint as SHA256 easily yet
      # https://github.com/paramiko/paramiko/pull/1103
      self.assertEqual(
381 382 383 384 385
          fingerprint,
          # XXX with sha256, we need to remove that trailing =
          base64.b64encode(
              hashlib.new(fingerprint_algorithm,
                          key_policy.key.asbytes()).digest())[:-1])
386 387 388

      channel = client.invoke_shell()
      channel.settimeout(30)
389 390 391 392 393 394 395 396 397
      received = ''
      while True:
        r = channel.recv(1024)
        if not r:
          break
        received += r
        if 'Selenium Server.' in received:
          break
      self.assertIn("Welcome to SlapOS Selenium Server.", received)
398 399


400 401 402 403 404
class TestFirefox52(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
405 406 407 408
  desired_capabilities = dict(DesiredCapabilities.FIREFOX, version='52.9.0esr')
  user_agent = 'Gecko/20100101 Firefox/52.0'
  # resizing window is not supported on firefox 52 geckodriver
  test_resize_window = unittest.expectedFailure(
409
      BrowserCompatibilityMixin.test_resize_window)
410 411


412 413 414 415 416
class TestFirefox60(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
417 418 419 420
  desired_capabilities = dict(DesiredCapabilities.FIREFOX, version='60.0.2esr')
  user_agent = 'Gecko/20100101 Firefox/60.0'


421 422 423 424 425
class TestFirefox68(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
426 427 428
  desired_capabilities = dict(DesiredCapabilities.FIREFOX, version='68.0.2esr')
  user_agent = 'Gecko/20100101 Firefox/68.0'

429 430 431 432 433 434 435 436
class TestFirefox78(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
  desired_capabilities = dict(DesiredCapabilities.FIREFOX, version='78.1.0esr')
  user_agent = 'Gecko/20100101 Firefox/78.0'

437

438 439 440 441 442
class TestChrome69(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
443 444
  desired_capabilities = dict(DesiredCapabilities.CHROME, version='69.0.3497.0')
  user_agent = 'Chrome/69.0.3497.0'
445 446 447 448 449 450 451 452 453


class TestChrome91(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
  desired_capabilities = dict(DesiredCapabilities.CHROME, version='91.0.4472.114')
  user_agent = 'Chrome/91.0.4472.0'