Commit 6fef2bb3 authored by Romain Courteaud's avatar Romain Courteaud

Monitor the SSL certificate status

Cetificates are monitored until they expired.
parent 89b7bc7f
[SURYKATKA]
url =
https://sha1-2016.badssl.com/
https://sha1-2017.badssl.com/
https://expired.badssl.com/
https://wrong.host.badssl.com/
https://self-signed.badssl.com/
https://untrusted-root.badssl.com/
https://revoked.badssl.com/
https://pinning-test.badssl.com/
https://no-common-name.badssl.com/
https://no-subject.badssl.com/
https://no-subject.badssl.com/
https://incomplete-chain.badssl.com/
https://sha1-intermediate.badssl.com/
https://sha256.badssl.com/
https://sha384.badssl.com/
https://sha512.badssl.com/
https://1000-sans.badssl.com/
https://10000-sans.badssl.com/
https://ecc256.badssl.com/
https://ecc384.badssl.com/
https://rsa2048.badssl.com/
https://rsa4096.badssl.com/
https://rsa8192.badssl.com/
https://extended-validation.badssl.com/
https://cbc.badssl.com/
https://rc4-md5.badssl.com/
https://rc4.badssl.com/
https://3des.badssl.com/
https://null.badssl.com/
https://mozilla-old.badssl.com/
https://mozilla-intermediate.badssl.com/
https://mozilla-modern.badssl.com/
https://dh480.badssl.com/
https://dh512.badssl.com/
https://dh1024.badssl.com/
https://dh2048.badssl.com/
https://dh-small-subgroup.badssl.com/
https://dh-composite.badssl.com/
https://static-rsa.badssl.com/
https://tls-v1-0.badssl.com:1010/
https://tls-v1-1.badssl.com:1011/
https://tls-v1-2.badssl.com:1012/
https://invalid-expected-sct.badssl.com/
https://no-sct.badssl.com/
https://long-extended-subdomain-name-containing-many-letters-and-dashes.badssl.com/
https://longextendedsubdomainnamewithoutdashesinordertotestwordwrapping.badssl.com/
https://superfish.badssl.com/
https://edellroot.badssl.com/
https://dsdtestprovider.badssl.com/
https://preact-cli.badssl.com/
https://webpack-dev-server.badssl.com/
\ No newline at end of file
...@@ -32,6 +32,7 @@ from .network import isTcpPortOpen, reportNetwork ...@@ -32,6 +32,7 @@ from .network import isTcpPortOpen, reportNetwork
import json import json
import email.utils import email.utils
from collections import OrderedDict from collections import OrderedDict
from .ssl import hasValidSSLCertificate, reportSslCertificate
__version__ = "0.1.1" __version__ = "0.1.1"
...@@ -112,6 +113,19 @@ class WebBot: ...@@ -112,6 +113,19 @@ class WebBot:
self._db, server_ip, port, status_id, timeout self._db, server_ip, port, status_id, timeout
): ):
for hostname in server_ip_dict[server_ip]: for hostname in server_ip_dict[server_ip]:
if port == 443:
# Store certificate information
if not hasValidSSLCertificate(
self._db,
server_ip,
port,
hostname,
status_id,
timeout,
):
# If certificate is not valid,
# no need to do another query
continue
url = "%s://%s" % (protocol, hostname) url = "%s://%s" % (protocol, hostname)
if url not in url_dict: if url not in url_dict:
url_dict[url] = [] url_dict[url] = []
...@@ -221,6 +235,33 @@ class WebBot: ...@@ -221,6 +235,33 @@ class WebBot:
url_dict[url] = [] url_dict[url] = []
url_dict[url].append(network_change["ip"]) url_dict[url].append(network_change["ip"])
# Report the SSL status
query = reportSslCertificate(
self._db,
ip=[x for x in server_ip_dict.keys()],
port=443,
hostname=domain_list,
)
result_dict["ssl_certificate"] = []
for ssl_certificate in query.dicts().iterator():
result_dict["ssl_certificate"].append(
{
"hostname": ssl_certificate["hostname"],
"ip": ssl_certificate["ip"],
"port": ssl_certificate["port"],
"sha1_fingerprint": ssl_certificate["sha1_fingerprint"],
"subject": ssl_certificate["subject"],
"issuer": ssl_certificate["issuer"],
"not_before": rfc822(ssl_certificate["not_before"])
if (ssl_certificate["not_before"] is not None)
else None,
"not_after": rfc822(ssl_certificate["not_after"])
if (ssl_certificate["not_after"] is not None)
else None,
"date": rfc822(ssl_certificate["status"]),
}
)
# XXX put back orignal url list # XXX put back orignal url list
for url in self.calculateUrlList(): for url in self.calculateUrlList():
if url not in url_dict: if url not in url_dict:
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
import peewee import peewee
from playhouse.migrate import migrate, SqliteMigrator
from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqlite_ext import SqliteExtDatabase
import datetime import datetime
...@@ -94,6 +93,22 @@ class LogDB: ...@@ -94,6 +93,22 @@ class LogDB:
"status", "resolver_ip", "domain", "rdtype" "status", "resolver_ip", "domain", "rdtype"
) )
class SslChange(BaseModel):
status = peewee.ForeignKeyField(Status)
ip = peewee.TextField()
port = peewee.IntegerField()
hostname = peewee.TextField()
sha1_fingerprint = peewee.TextField(null=True)
not_before = peewee.TimestampField(null=True, utc=True)
not_after = peewee.TimestampField(null=True, utc=True)
subject = peewee.TextField(null=True)
issuer = peewee.TextField(null=True)
class Meta:
primary_key = peewee.CompositeKey(
"status", "ip", "port", "hostname"
)
class HttpCodeChange(BaseModel): class HttpCodeChange(BaseModel):
status = peewee.ForeignKeyField(Status) status = peewee.ForeignKeyField(Status)
ip = peewee.TextField() ip = peewee.TextField()
...@@ -109,13 +124,17 @@ class LogDB: ...@@ -109,13 +124,17 @@ class LogDB:
self.NetworkChange = NetworkChange self.NetworkChange = NetworkChange
self.DnsChange = DnsChange self.DnsChange = DnsChange
self.HttpCodeChange = HttpCodeChange self.HttpCodeChange = HttpCodeChange
self.SslChange = SslChange
def createTables(self): def createTables(self):
# http://www.sqlite.org/pragma.html#pragma_user_version # http://www.sqlite.org/pragma.html#pragma_user_version
db_version = self._db.pragma("user_version") db_version = self._db.pragma("user_version")
expected_version = 1 expected_version = 2
if db_version == 0: if db_version != expected_version:
with self._db.transaction(): with self._db.transaction():
if db_version == 0:
# version 0 (no table)
self._db.create_tables( self._db.create_tables(
[ [
self.Status, self.Status,
...@@ -126,21 +145,14 @@ class LogDB: ...@@ -126,21 +145,14 @@ class LogDB:
self.DnsChange, self.DnsChange,
] ]
) )
self._db.pragma("user_version", expected_version)
elif db_version != expected_version:
# migrator = SqliteMigrator(self._db)
migration_list = []
sql_query_list = []
if migration_list or sql_query_list: if db_version <= 1:
with self._db.transaction(): # version 1 without SSL support
if migration_list: self._db.create_tables([self.SslChange])
migrate(*migration_list)
if sql_query_list: else:
for sql_query in sql_query_list: raise ValueError("Can not downgrade SQLite DB")
self._db.execute_sql(
sql_query, require_commit=False
)
self._db.pragma("user_version", expected_version) self._db.pragma("user_version", expected_version)
def close(self): def close(self):
......
# Copyright (C) 2019 Nexedi SA and Contributors.
# Romain Courteaud <romain@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from peewee import fn
import socket
import ssl
import hashlib
from binascii import hexlify
import datetime
TIMEOUT = 2
def reportSslCertificate(db, ip=None, port=None, hostname=None):
query = (
db.SslChange.select(db.SslChange)
.group_by(db.SslChange.ip, db.SslChange.port, db.SslChange.hostname,)
.having(db.SslChange.status_id == fn.MAX(db.SslChange.status_id))
)
if hostname is not None:
if type(hostname) == list:
query = query.where(db.SslChange.hostname << hostname)
else:
query = query.where(db.SslChange.hostname == hostname)
if port is not None:
if type(port) == list:
query = query.where(db.SslChange.port << port)
else:
query = query.where(db.SslChange.port == port)
if ip is not None:
if type(ip) == list:
query = query.where(db.SslChange.ip << ip)
else:
query = query.where(db.SslChange.ip == ip)
return query
def logSslCertificate(
db,
ip,
port,
hostname,
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id,
):
with db._db.atomic():
try:
# Check previous parameter value
previous_entry = reportSslCertificate(
db, ip=ip, port=port, hostname=hostname
).get()
except db.SslChange.DoesNotExist:
previous_entry = None
if (previous_entry is None) or (
previous_entry.sha1_fingerprint != sha1_fingerprint
):
previous_entry = db.SslChange.create(
status=status_id,
ip=ip,
port=port,
hostname=hostname,
sha1_fingerprint=sha1_fingerprint,
not_before=not_before,
not_after=not_after,
subject=subject,
issuer=issuer,
)
return previous_entry.status_id
def hasValidSSLCertificate(db, ip, port, hostname, status_id, timeout=TIMEOUT):
ssl_context = ssl.create_default_context()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
wrapped_sock = ssl_context.wrap_socket(sock, server_hostname=hostname)
try:
wrapped_sock.connect((ip, port))
der = wrapped_sock.getpeercert(True)
# XXX How to extract info from the der directly?
ssl_info = wrapped_sock.getpeercert()
except (ssl.SSLError, ConnectionRefusedError, socket.timeout, OSError):
wrapped_sock.close()
# XXX Expired certificate can not be fetched with the builtin ssl lib
# pyOpenSSL is one way to fix this
# https://stackoverflow.com/a/52298575
logSslCertificate(
db, ip, port, hostname, None, None, None, None, None, status_id,
)
return False
except:
wrapped_sock.close()
raise
wrapped_sock.close()
sha1_fingerprint = hexlify(hashlib.sha1(der).digest())
ssl_date_fmt = "%b %d %H:%M:%S %Y %Z"
not_before = datetime.datetime.strptime(
ssl_info["notBefore"], ssl_date_fmt
)
not_after = datetime.datetime.strptime(ssl_info["notAfter"], ssl_date_fmt)
subject = dict([y for x in ssl_info["subject"] for y in x]).get(
"commonName", ""
)
issuer = dict([y for x in ssl_info["issuer"] for y in x]).get(
"commonName", ""
)
logSslCertificate(
db,
ip,
port,
hostname,
sha1_fingerprint.decode(),
not_before,
not_after,
subject,
issuer,
status_id,
)
return True
...@@ -44,6 +44,17 @@ def checkHttpCodeChange(bot, result_list): ...@@ -44,6 +44,17 @@ def checkHttpCodeChange(bot, result_list):
assert [(x.ip, x.url) for x in select_list] == result_list assert [(x.ip, x.url) for x in select_list] == result_list
def checkSslChange(bot, result_list):
assert bot._db.SslChange.select().count() == len(result_list)
select_list = (
bot._db.SslChange.select()
.order_by(bot._db.SslChange.ip.asc())
.order_by(bot._db.SslChange.port.asc())
.order_by(bot._db.SslChange.hostname.asc())
)
assert [(x.ip, x.port, x.hostname) for x in select_list] == result_list
def checkDnsChange(bot, result_list): def checkDnsChange(bot, result_list):
assert bot._db.DnsChange.select().count() == len(result_list) assert bot._db.DnsChange.select().count() == len(result_list)
select_list = ( select_list = (
...@@ -82,6 +93,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -82,6 +93,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, "example.org")]) checkDnsChange(bot, [(resolver_ip, "example.org")])
checkSslChange(bot, [])
checkHttpCodeChange(bot, []) checkHttpCodeChange(bot, [])
def test_oneNameserverOneDomainOneIp(self): def test_oneNameserverOneDomainOneIp(self):
...@@ -97,11 +110,22 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -97,11 +110,22 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
) as mock_socket, mock.patch( ) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_get_default_resolver.return_value = resolver mock_get_default_resolver.return_value = resolver
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot = WebBot( bot = WebBot(
mapping={"SQLITE": ":memory:", "DOMAIN": "example.org"} mapping={"SQLITE": ":memory:", "DOMAIN": "example.org"}
...@@ -111,7 +135,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -111,7 +135,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 2 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2 assert mock_request.call_count == 2
checkNetworkChange( checkNetworkChange(
...@@ -120,6 +145,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -120,6 +145,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, "example.org")]) checkDnsChange(bot, [(resolver_ip, "example.org")])
checkSslChange(bot, [("1.2.3.4", 443, "example.org")])
checkHttpCodeChange( checkHttpCodeChange(
bot, bot,
[ [
...@@ -146,15 +173,27 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -146,15 +173,27 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
) as mock_socket, mock.patch( ) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot.iterateLoop() bot.iterateLoop()
assert mock_query.call_count == 4 assert mock_query.call_count == 4
assert mock_socket.call_count == 2 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2 assert mock_request.call_count == 2
checkNetworkChange( checkNetworkChange(
...@@ -171,6 +210,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -171,6 +210,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot, [(resolver_ip, "example.org"), (resolver_ip_2, "example.org")] bot, [(resolver_ip, "example.org"), (resolver_ip_2, "example.org")]
) )
checkSslChange(bot, [("1.2.3.4", 443, "example.org")])
checkHttpCodeChange( checkHttpCodeChange(
bot, bot,
[ [
...@@ -198,15 +239,34 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -198,15 +239,34 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
) as mock_socket, mock.patch( ) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot.iterateLoop() bot.iterateLoop()
assert mock_query.call_count == 3 assert mock_query.call_count == 3
assert mock_socket.call_count == 2 assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4 assert mock_request.call_count == 4
checkNetworkChange( checkNetworkChange(
...@@ -215,6 +275,10 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -215,6 +275,10 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, domain_1), (resolver_ip, domain_2)]) checkDnsChange(bot, [(resolver_ip, domain_1), (resolver_ip, domain_2)])
checkSslChange(
bot, [("1.2.3.4", 443, domain_1), ("1.2.3.4", 443, domain_2)]
)
checkHttpCodeChange( checkHttpCodeChange(
bot, bot,
[ [
...@@ -243,6 +307,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -243,6 +307,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
) as mock_socket, mock.patch( ) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
...@@ -250,11 +316,28 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -250,11 +316,28 @@ class SurykatkaBotTestCase(unittest.TestCase):
MockAnswer("1.2.3.4"), MockAnswer("1.2.3.4"),
MockAnswer("1.2.3.5"), MockAnswer("1.2.3.5"),
] ]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot.iterateLoop() bot.iterateLoop()
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 4 assert mock_socket.call_count == 6
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4 assert mock_request.call_count == 4
checkNetworkChange( checkNetworkChange(
...@@ -270,6 +353,11 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -270,6 +353,11 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, domain)]) checkDnsChange(bot, [(resolver_ip, domain)])
checkSslChange(
bot,
[("1.2.3.4", 443, "example.org"), ("1.2.3.5", 443, "example.org")],
)
checkHttpCodeChange( checkHttpCodeChange(
bot, bot,
[ [
...@@ -299,15 +387,34 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -299,15 +387,34 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
) as mock_socket, mock.patch( ) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot.iterateLoop() bot.iterateLoop()
assert mock_query.call_count == 3 assert mock_query.call_count == 3
assert mock_socket.call_count == 2 assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4 assert mock_request.call_count == 4
checkNetworkChange( checkNetworkChange(
...@@ -316,6 +423,10 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -316,6 +423,10 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, domain), (resolver_ip, sub_domain)]) checkDnsChange(bot, [(resolver_ip, domain), (resolver_ip, sub_domain)])
checkSslChange(
bot, [("1.2.3.4", 443, domain), ("1.2.3.4", 443, sub_domain)]
)
checkHttpCodeChange( checkHttpCodeChange(
bot, bot,
[ [
...@@ -344,14 +455,26 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -344,14 +455,26 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
) as mock_socket, mock.patch( ) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot.iterateLoop() bot.iterateLoop()
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 2 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 3 assert mock_request.call_count == 3
checkNetworkChange( checkNetworkChange(
...@@ -360,6 +483,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -360,6 +483,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, domain)]) checkDnsChange(bot, [(resolver_ip, domain)])
checkSslChange(bot, [("1.2.3.4", 443, domain)])
checkHttpCodeChange( checkHttpCodeChange(
bot, bot,
[ [
......
...@@ -28,7 +28,38 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -28,7 +28,38 @@ class SurykatkaDBTestCase(unittest.TestCase):
def test_createTable(self): def test_createTable(self):
assert self.db._db.pragma("user_version") == 0 assert self.db._db.pragma("user_version") == 0
self.db.createTables() self.db.createTables()
assert self.db._db.pragma("user_version") == 1 assert self.db._db.pragma("user_version") == 2
def test_downgrade(self):
assert self.db._db.pragma("user_version") == 0
# Unpossible high version
self.db._db.pragma("user_version", 9999)
try:
self.db.createTables()
except ValueError:
assert self.db._db.pragma("user_version") == 9999
else:
raise NotImplementedError("Expected ValueError")
def test_migrationFromVersion1(self):
assert self.db._db.pragma("user_version") == 0
# Recreate version 1
with self.db._db.transaction():
self.db._db.create_tables(
[
self.db.Status,
self.db.ConfigurationChange,
self.db.HttpCodeChange,
self.db.NetworkChange,
self.db.PlatformChange,
self.db.DnsChange,
]
)
self.db._db.pragma("user_version", 1)
self.db.createTables()
assert self.db._db.pragma("user_version") == 2
def suite(): def suite():
......
# Copyright (C) 2019 Nexedi SA and Contributors.
# Romain Courteaud <romain@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import unittest
from surykatka.db import LogDB
import surykatka.ssl
from surykatka.ssl import logSslCertificate, hasValidSSLCertificate
from surykatka.status import logStatus
import mock
import peewee
import datetime
class TZ(datetime.tzinfo):
"""A time zone with an arbitrary, constant -06:39 offset."""
def utcoffset(self, dt):
return datetime.timedelta(hours=-6, minutes=-39)
def dst(self, dt):
return datetime.timedelta(hours=-6, minutes=-39)
class SurykatkaSslTestCase(unittest.TestCase):
def setUp(self):
self.db = LogDB(":memory:")
self.db.createTables()
################################################
# logSSLCertificate
################################################
def test_logSSLCertificate_insertFirst(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
sha1_fingerprint = "asdfghj"
not_before = datetime.datetime.utcnow()
not_after = datetime.datetime(2000, 1, 1, tzinfo=TZ())
subject = "foosubject"
issuer = "barissuer"
result = logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id,
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert self.db.SslChange.get().sha1_fingerprint == sha1_fingerprint
assert (
int(
(
self.db.SslChange.get().not_before - not_before
).total_seconds()
)
== 0
)
assert self.db.SslChange.get().not_after == datetime.datetime(
2000, 1, 1, 6, 39
)
assert self.db.SslChange.get().subject == subject
assert self.db.SslChange.get().issuer == issuer
assert result == status_id
def test_logSSLCertificate_insertOnlyOnePerStatusIdIPPortHostname(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
sha1_fingerprint = "asdfghj"
not_before = datetime.datetime.utcnow()
not_after = datetime.datetime.utcnow()
subject = "foosubject"
issuer = "barissuer"
logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id,
)
try:
logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint + ".",
not_before,
not_after,
subject,
issuer,
status_id,
)
except peewee.IntegrityError:
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().status_id == status_id
else:
raise NotImplementedError("Expected IntegrityError")
def test_logSSLCertificate_skipIdenticalPreviousValues(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
sha1_fingerprint = "asdfghj"
not_before = datetime.datetime.utcnow()
not_after = datetime.datetime.utcnow()
subject = "foosubject"
issuer = "barissuer"
status_id = logStatus(self.db, "foo")
result = logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id,
)
status_id_2 = logStatus(self.db, "foo")
result_2 = logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id_2,
)
assert result_2 == result
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert self.db.SslChange.get().sha1_fingerprint == sha1_fingerprint
assert self.db.SslChange.get().status_id == status_id
def test_logSSLCertificate_insertWhenDifferentState(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
sha1_fingerprint = "asdfghj"
not_before = datetime.datetime.utcnow()
not_after = datetime.datetime.utcnow()
subject = "foosubject"
issuer = "barissuer"
status_id = logStatus(self.db, "foo")
result = logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id,
)
status_id_2 = logStatus(self.db, "foo")
sha1_fingerprint_2 = sha1_fingerprint + "."
result_2 = logSslCertificate(
self.db,
ip,
port,
hostname,
sha1_fingerprint_2,
not_before,
not_after,
subject,
issuer,
status_id_2,
)
assert result_2 != result
assert self.db.SslChange.select().count() == 2
assert (
self.db.SslChange.get(
self.db.SslChange.status == status_id
).sha1_fingerprint
== sha1_fingerprint
)
assert (
self.db.SslChange.get(
self.db.SslChange.status == status_id_2
).sha1_fingerprint
== sha1_fingerprint_2
)
def test_logSSLCertificate_insertDifferentKeys(self):
ip = "127.0.0.1"
ip_2 = ip + "2"
port = 1234
port_2 = port + 1
hostname = "example.org"
hostname_2 = hostname + "."
status_id = logStatus(self.db, "foo")
sha1_fingerprint = "asdfghj"
not_before = datetime.datetime.utcnow()
not_after = datetime.datetime.utcnow()
subject = "foosubject"
issuer = "barissuer"
args = [
sha1_fingerprint,
not_before,
not_after,
subject,
issuer,
status_id,
]
logSslCertificate(self.db, ip, port, hostname, *args)
logSslCertificate(self.db, ip_2, port, hostname, *args)
logSslCertificate(self.db, ip, port_2, hostname, *args)
logSslCertificate(self.db, ip, port, hostname_2, *args)
logSslCertificate(self.db, ip_2, port_2, hostname, *args)
logSslCertificate(self.db, ip_2, port, hostname_2, *args)
logSslCertificate(self.db, ip, port_2, hostname_2, *args)
logSslCertificate(self.db, ip_2, port_2, hostname_2, *args)
assert self.db.SslChange.select().count() == 8
################################################
# hasValidSSLCertificate
################################################
def test_hasValidSSLCertificate_valid(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
mock_create_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id
)
assert mock_socket.call_count == 1
assert mock_socket.return_value.settimeout.call_count == 1
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 1
assert mock_create_context.return_value.wrap_socket.call_count == 1
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 1
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 2
)
assert mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_args_list[
0
] == mock.call(
True
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_args_list[
1
]
== mock.call()
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert (
self.db.SslChange.get().sha1_fingerprint
== "da39a3ee5e6b4b0d3255bfef95601890afd80709"
)
assert self.db.SslChange.get().not_before == datetime.datetime(
2020, 1, 27, 4, 33, 22
)
assert self.db.SslChange.get().not_after == datetime.datetime(
2020, 1, 27, 4, 33, 22
)
assert self.db.SslChange.get().subject == "foo"
assert self.db.SslChange.get().issuer == "bar"
assert self.db.SslChange.get().status_id == status_id
assert result == True
def test_hasValidSSLCertificate_validTwice(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
status_id_2 = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
mock_create_context.return_value.wrap_socket.return_value.getpeercert.side_effect = (
[
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
* 2
)
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id
)
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id_2
)
assert mock_socket.call_count == 2
assert mock_socket.return_value.settimeout.call_count == 2
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 2
assert mock_create_context.return_value.wrap_socket.call_count == 2
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 2
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 4
)
assert mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_args_list[
0
] == mock.call(
True
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_args_list[
1
]
== mock.call()
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert (
self.db.SslChange.get().sha1_fingerprint
== "da39a3ee5e6b4b0d3255bfef95601890afd80709"
)
assert self.db.SslChange.get().not_before == datetime.datetime(
2020, 1, 27, 4, 33, 22
)
assert self.db.SslChange.get().not_after == datetime.datetime(
2020, 1, 27, 4, 33, 22
)
assert self.db.SslChange.get().subject == "foo"
assert self.db.SslChange.get().issuer == "bar"
assert self.db.SslChange.get().status_id == status_id
assert result == True
def test_hasValidSSLCertificate_sslError(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
def sideEffect(*args, **kw):
raise surykatka.ssl.ssl.SSLError()
mock_create_context.return_value.wrap_socket.return_value.connect.side_effect = (
sideEffect
)
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id
)
assert mock_socket.call_count == 1
assert mock_socket.return_value.settimeout.call_count == 1
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 1
assert mock_create_context.return_value.wrap_socket.call_count == 1
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 1
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 0
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert self.db.SslChange.get().sha1_fingerprint == None
assert self.db.SslChange.get().not_before == None
assert self.db.SslChange.get().not_after == None
assert self.db.SslChange.get().subject == None
assert self.db.SslChange.get().issuer == None
assert self.db.SslChange.get().status_id == status_id
assert result == False
def test_hasValidSSLCertificate_ConnectionRefusedError(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
def sideEffect(*args, **kw):
raise ConnectionRefusedError()
mock_create_context.return_value.wrap_socket.return_value.connect.side_effect = (
sideEffect
)
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id
)
assert mock_socket.call_count == 1
assert mock_socket.return_value.settimeout.call_count == 1
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 1
assert mock_create_context.return_value.wrap_socket.call_count == 1
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 1
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 0
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert self.db.SslChange.get().sha1_fingerprint == None
assert self.db.SslChange.get().not_before == None
assert self.db.SslChange.get().not_after == None
assert self.db.SslChange.get().subject == None
assert self.db.SslChange.get().issuer == None
assert self.db.SslChange.get().status_id == status_id
assert result == False
def test_hasValidSSLCertificate_timeout(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
def sideEffect(*args, **kw):
raise surykatka.ssl.socket.timeout()
mock_create_context.return_value.wrap_socket.return_value.connect.side_effect = (
sideEffect
)
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id
)
assert mock_socket.call_count == 1
assert mock_socket.return_value.settimeout.call_count == 1
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 1
assert mock_create_context.return_value.wrap_socket.call_count == 1
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 1
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 0
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert self.db.SslChange.get().sha1_fingerprint == None
assert self.db.SslChange.get().not_before == None
assert self.db.SslChange.get().not_after == None
assert self.db.SslChange.get().subject == None
assert self.db.SslChange.get().issuer == None
assert self.db.SslChange.get().status_id == status_id
assert result == False
def test_hasValidSSLCertificate_OSError(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
def sideEffect(*args, **kw):
raise OSError()
mock_create_context.return_value.wrap_socket.return_value.connect.side_effect = (
sideEffect
)
result = hasValidSSLCertificate(
self.db, ip, port, hostname, status_id
)
assert mock_socket.call_count == 1
assert mock_socket.return_value.settimeout.call_count == 1
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 1
assert mock_create_context.return_value.wrap_socket.call_count == 1
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 1
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 0
)
assert self.db.SslChange.select().count() == 1
assert self.db.SslChange.get().ip == ip
assert self.db.SslChange.get().port == port
assert self.db.SslChange.get().hostname == hostname
assert self.db.SslChange.get().sha1_fingerprint == None
assert self.db.SslChange.get().not_before == None
assert self.db.SslChange.get().not_after == None
assert self.db.SslChange.get().subject == None
assert self.db.SslChange.get().issuer == None
assert self.db.SslChange.get().status_id == status_id
assert result == False
def test_hasValidSSLCertificate_Exception(self):
ip = "127.0.0.1"
port = 1234
hostname = "example.org"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.ssl.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_context:
def sideEffect(*args, **kw):
raise Exception()
mock_create_context.return_value.wrap_socket.return_value.connect.side_effect = (
sideEffect
)
try:
hasValidSSLCertificate(self.db, ip, port, hostname, status_id)
except Exception:
assert self.db.SslChange.select().count() == 0
else:
raise NotImplementedError("Expected OSError")
assert mock_socket.call_count == 1
assert mock_socket.return_value.settimeout.call_count == 1
mock_socket.return_value.settimeout.assert_called_with(2)
assert mock_socket.return_value.connect.call_count == 0
assert mock_socket.return_value.close.call_count == 0
assert mock_create_context.call_count == 1
assert mock_create_context.return_value.wrap_socket.call_count == 1
mock_create_context.return_value.wrap_socket.assert_called_with(
mock_socket.return_value, server_hostname=hostname
)
assert (
mock_create_context.return_value.wrap_socket.return_value.connect.call_count
== 1
)
mock_create_context.return_value.wrap_socket.return_value.connect.assert_called_with(
(ip, port)
)
assert (
mock_create_context.return_value.wrap_socket.return_value.getpeercert.call_count
== 0
)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(SurykatkaSslTestCase))
return suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment