Commit 9c3fc434 authored by Vincent Pelletier's avatar Vincent Pelletier Committed by Vincent Pelletier

tests: Exclude helper functions and error paths from coverage.

parent d69c758a
...@@ -51,7 +51,7 @@ _cryptography_backend = default_backend() ...@@ -51,7 +51,7 @@ _cryptography_backend = default_backend()
NOT_CAUCASE_OID = '2.25.285541874270823339875695650038637483518' NOT_CAUCASE_OID = '2.25.285541874270823339875695650038637483518'
def canConnect(address): def canConnect(address): # pragma: no cover
""" """
Returns True if a connection can be established to given address, False Returns True if a connection can be established to given address, False
otherwise. otherwise.
...@@ -64,7 +64,7 @@ def canConnect(address): ...@@ -64,7 +64,7 @@ def canConnect(address):
raise raise
return True return True
def retry(callback, try_count=200, try_delay=0.1): def retry(callback, try_count=200, try_delay=0.1): # pragma: no cover
""" """
Poll <callback> every <try_delay> for <try_count> times or until it returns Poll <callback> every <try_delay> for <try_count> times or until it returns
a true value. a true value.
...@@ -170,7 +170,7 @@ class UntilEvent(object): ...@@ -170,7 +170,7 @@ class UntilEvent(object):
""" """
Retrieve current action. Not very useful. Retrieve current action. Not very useful.
""" """
return self._action return self._action # pragma: no cover
@action.setter @action.setter
def action(self, value): def action(self, value):
...@@ -178,7 +178,7 @@ class UntilEvent(object): ...@@ -178,7 +178,7 @@ class UntilEvent(object):
Change the action which will happen on next event wakeup. Change the action which will happen on next event wakeup.
""" """
if value not in (ON_EVENT_RAISE, ON_EVENT_EXPIRE): if value not in (ON_EVENT_RAISE, ON_EVENT_EXPIRE):
raise ValueError raise ValueError # pragma: no cover
self._action = value self._action = value
def wait(self, timeout=10, clear=False): def wait(self, timeout=10, clear=False):
...@@ -286,7 +286,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -286,7 +286,7 @@ class CaucaseTest(unittest.TestCase):
), ),
) )
except SystemExit, e: except SystemExit, e:
return e.code return e.code # pragma: no cover
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
return 1 return 1
return 0 return 0
...@@ -317,7 +317,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -317,7 +317,7 @@ class CaucaseTest(unittest.TestCase):
self.assertTrue(server.is_alive(), 'caucased crashed on startup') or self.assertTrue(server.is_alive(), 'caucased crashed on startup') or
canConnect((parsed_url.hostname, parsed_url.port)) canConnect((parsed_url.hostname, parsed_url.port))
), ),
): ): # pragma: no cover
self._stopServer() self._stopServer()
raise AssertionError('Could not connect to %r after 1 second' % ( raise AssertionError('Could not connect to %r after 1 second' % (
self._caucase_url, self._caucase_url,
...@@ -330,7 +330,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -330,7 +330,7 @@ class CaucaseTest(unittest.TestCase):
server = self._server server = self._server
self._server_event.set() self._server_event.set()
server.join(2) server.join(2)
if server.is_alive(): if server.is_alive(): # pragma: no cover
raise ValueError('%r does not wish to die' % (server, )) raise ValueError('%r does not wish to die' % (server, ))
def _runClient(self, *argv): def _runClient(self, *argv):
...@@ -400,7 +400,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -400,7 +400,7 @@ class CaucaseTest(unittest.TestCase):
) )
while True: while True:
row = c.fetchone() row = c.fetchone()
if row is None: if row is None: # pragma: no cover
raise Exception('CA with serial %r not found' % (serial, )) raise Exception('CA with serial %r not found' % (serial, ))
crt = utils.load_ca_certificate(row['crt'].encode('ascii')) crt = utils.load_ca_certificate(row['crt'].encode('ascii'))
if crt.serial_number == serial: if crt.serial_number == serial:
...@@ -847,7 +847,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -847,7 +847,7 @@ class CaucaseTest(unittest.TestCase):
client.createCertificateSigningRequest('Not actually a CSR') client.createCertificateSigningRequest('Not actually a CSR')
except CaucaseError, e: except CaucaseError, e:
self.assertEqual(e.args[0], 400, e) self.assertEqual(e.args[0], 400, e)
else: else: # pragma: no cover
raise AssertionError('Did not raise CaucaseError(400, ...)') raise AssertionError('Did not raise CaucaseError(400, ...)')
def testUpdateUser(self): def testUpdateUser(self):
...@@ -1226,7 +1226,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1226,7 +1226,7 @@ class CaucaseTest(unittest.TestCase):
# reached its certificate auto-approval limit. # reached its certificate auto-approval limit.
os.unlink(self._server_key) os.unlink(self._server_key)
self._startServer() self._startServer()
if not retry(lambda: os.path.exists(self._server_key)): if not retry(lambda: os.path.exists(self._server_key)): # pragma: no cover
raise AssertionError('%r was not recreated within 1 second' % ( raise AssertionError('%r was not recreated within 1 second' % (
self._server_key, self._server_key,
)) ))
...@@ -1270,7 +1270,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1270,7 +1270,7 @@ class CaucaseTest(unittest.TestCase):
self._startServer() self._startServer()
if not retry( if not retry(
lambda: open(self._server_key).read() != reference_server_key, lambda: open(self._server_key).read() != reference_server_key,
): ): # pragma: no cover
raise AssertionError('Server did not renew its key pair within 1 second') raise AssertionError('Server did not renew its key pair within 1 second')
# But user still trusts the server # But user still trusts the server
self._runClient( self._runClient(
...@@ -1322,7 +1322,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1322,7 +1322,7 @@ class CaucaseTest(unittest.TestCase):
raise ValueError('Some generic exception') raise ValueError('Some generic exception')
@staticmethod @staticmethod
def _placeholder(_): def _placeholder(_): # pragma: no cover
""" """
Placeholder methods, for when method lookup happens before noticing Placeholder methods, for when method lookup happens before noticing
issues in the query. issues in the query.
...@@ -1516,7 +1516,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1516,7 +1516,7 @@ class CaucaseTest(unittest.TestCase):
# Note: backup could have triggered between first and second user's key # Note: backup could have triggered between first and second user's key
# creation. We need it to be signed by both keys, so delete any backup file # creation. We need it to be signed by both keys, so delete any backup file
# which would exist at this point. # which would exist at this point.
for backup_path in glob.glob(backup_glob): for backup_path in glob.glob(backup_glob): # pragma: no cover
os.unlink(backup_path) os.unlink(backup_path)
before_backup = list(SQLite3Storage( before_backup = list(SQLite3Storage(
self._server_db, self._server_db,
...@@ -1524,7 +1524,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1524,7 +1524,7 @@ class CaucaseTest(unittest.TestCase):
).dumpIterator()) ).dumpIterator())
self._startServer('--backup-directory', self._server_backup_path) self._startServer('--backup-directory', self._server_backup_path)
backup_path_list = retry(lambda: glob.glob(backup_glob)) backup_path_list = retry(lambda: glob.glob(backup_glob))
if not backup_path_list: if not backup_path_list: # pragma: no cover
raise AssertionError('Backup file not created after 1 second') raise AssertionError('Backup file not created after 1 second')
backup_path, = backup_path_list backup_path, = backup_path_list
self._stopServer() self._stopServer()
...@@ -1582,7 +1582,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1582,7 +1582,7 @@ class CaucaseTest(unittest.TestCase):
if row.startswith(CRT_INSERT): if row.startswith(CRT_INSERT):
crt_list.append(row) crt_list.append(row)
continue continue
if row.startswith(REV_INSERT): if row.startswith(REV_INSERT): # pragma: no cover
if rev_found: if rev_found:
raise AssertionError('Unexpected revocation found') raise AssertionError('Unexpected revocation found')
continue continue
...@@ -1624,7 +1624,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1624,7 +1624,7 @@ class CaucaseTest(unittest.TestCase):
os.unlink(backup_path) os.unlink(backup_path)
self._startServer('--backup-directory', self._server_backup_path) self._startServer('--backup-directory', self._server_backup_path)
backup_path_list = retry(lambda: glob.glob(backup_glob)) backup_path_list = retry(lambda: glob.glob(backup_glob))
if not backup_path_list: if not backup_path_list: # pragma: no cover
raise AssertionError('Backup file not created after 1 second') raise AssertionError('Backup file not created after 1 second')
backup_path, = glob.glob(backup_glob) backup_path, = glob.glob(backup_glob)
cli.key_id([ cli.key_id([
...@@ -1649,7 +1649,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -1649,7 +1649,7 @@ class CaucaseTest(unittest.TestCase):
del db del db
self._startServer('--backup-directory', self._server_backup_path) self._startServer('--backup-directory', self._server_backup_path)
backup_path_list = retry(lambda: glob.glob(backup_glob), try_count=300) backup_path_list = retry(lambda: glob.glob(backup_glob), try_count=300)
if not backup_path_list: if not backup_path_list: # pragma: no cover
raise AssertionError('Backup file took too long to be created') raise AssertionError('Backup file took too long to be created')
backup_path, = glob.glob(backup_glob) backup_path, = glob.glob(backup_glob)
cli.key_id([ cli.key_id([
...@@ -2004,5 +2004,5 @@ class CaucaseTest(unittest.TestCase): ...@@ -2004,5 +2004,5 @@ class CaucaseTest(unittest.TestCase):
self.assertEqual(os.stat(self._server_db).st_mode & 0o777, 0o600) self.assertEqual(os.stat(self._server_db).st_mode & 0o777, 0o600)
self.assertEqual(os.stat(self._server_key).st_mode & 0o777, 0o600) self.assertEqual(os.stat(self._server_key).st_mode & 0o777, 0o600)
if __name__ == '__main__': if __name__ == '__main__': # pragma: no cover
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment