Commit 77772997 authored by Romain Courteaud's avatar Romain Courteaud

report MX conf issue (ie, unexpected open port 25)

parent 19d1bbb3
......@@ -124,34 +124,62 @@ def filterWarningStatus(
if not status_dict["dns_server"]:
del status_dict["dns_server"]
mx_domain_dict = {}
for i in range(len(status_dict["dns_query"]) - 1, -1, -1):
state = status_dict["dns_query"][i]["response"]
if state != "":
if state == "":
if status_dict["dns_query"][i]["rdtype"] == "MX":
# No MX is allowed
del status_dict["dns_query"][i]
else:
# Keep track of possible domain handling MX
if status_dict["dns_query"][i]["rdtype"] == "MX":
for mx_domain in state.split(", "):
mx_domain_dict[mx_domain] = True
del status_dict["dns_query"][i]
if not status_dict["dns_query"]:
del status_dict["dns_query"]
if not status_dict["missing_data"]:
del status_dict["missing_data"]
for i in range(len(status_dict["http_server"]) - 1, -1, -1):
state = status_dict["http_server"][i]["state"]
for i in range(len(status_dict["tcp_server"]) - 1, -1, -1):
state = status_dict["tcp_server"][i]["state"]
# Skip if all domains lead to not critical urls
prefix = ""
if status_dict["http_server"][i]["port"] == 80:
if status_dict["tcp_server"][i]["port"] == 80:
prefix = "http://"
elif status_dict["http_server"][i]["port"] == 443:
elif status_dict["tcp_server"][i]["port"] == 443:
prefix = "https://"
domain_list = status_dict["http_server"][i]["domain"].split(", ")
elif status_dict["tcp_server"][i]["port"] == 25:
prefix = "smtp://"
else:
raise NotImplementedError(
"Not supported server port %i"
% status_dict["tcp_server"][i]["port"]
)
domain_list = status_dict["tcp_server"][i]["domain"].split(", ")
domain_list = [
x
for x in domain_list
if "%s%s" % (prefix, x) not in not_critical_url_list
]
if (state == "open") or (not domain_list):
del status_dict["http_server"][i]
if not status_dict["http_server"]:
del status_dict["http_server"]
if status_dict["tcp_server"][i]["port"] == 25:
has_intersection = (
len(set(mx_domain_dict.keys()).intersection(domain_list)) != 0
)
if ((state == "open") and has_intersection) or (
(state != "open") and (not has_intersection)
):
# if one MX points to this server, port should be open
# if no MX points to this serverm port should NOT be open
del status_dict["tcp_server"][i]
elif (state == "open") or (not domain_list):
del status_dict["tcp_server"][i]
if not status_dict["tcp_server"]:
del status_dict["tcp_server"]
for i in range(len(status_dict["ssl_certificate"]) - 1, -1, -1):
not_after = status_dict["ssl_certificate"][i]["not_after"]
......@@ -289,6 +317,10 @@ class WebBot:
server_ip_dict = getDomainIpDict(
self._db, status_id, resolver_ip_list, domain_list, "A", timeout
)
# Check the mail configuration for every domain (MX and SPF)
getDomainIpDict(
self._db, status_id, resolver_ip_list, domain_list, "MX", timeout
)
# Check TCP port for the list of IP found
# XXX For now, check http/https only
......@@ -296,12 +328,16 @@ class WebBot:
url_dict = {}
for server_ip in server_ip_list:
# XXX Check SSL certificate expiration
for port, protocol in [(80, "http"), (443, "https")]:
for port, protocol in [
(80, "http"),
(443, "https"),
(25, "smtp"),
]:
if isTcpPortOpen(
self._db, server_ip, port, status_id, timeout
):
for hostname in server_ip_dict[server_ip]:
if port == 443:
if port in [443, 587]:
# Store certificate information
if not hasValidSSLCertificate(
self._db,
......@@ -328,6 +364,9 @@ class WebBot:
# Check HTTP Status
for url in url_dict:
if url.startswith("smtp"):
# XXX TODO implement smtp connection check
continue
for ip in url_dict[url]:
checkHttpStatus(
self._db,
......@@ -373,7 +412,10 @@ class WebBot:
"registrar": domain_change["registrar"],
"whois_server": domain_change["whois_server"],
"creation_date": rfc822(domain_change["creation_date"])
if (type(domain_change["creation_date"]) is datetime.datetime)
if (
type(domain_change["creation_date"])
is datetime.datetime
)
else None,
"updated_date": rfc822(domain_change["updated_date"])
if (domain_change["updated_date"] is datetime.datetime)
......@@ -417,46 +459,91 @@ class WebBot:
}
)
result_dict["missing_data"] = []
for resolver_ip in self.config["NAMESERVER"].split():
if resolver_ip not in checked_resolver_ip_dict:
result_dict["missing_data"].append(
{
"text": resolver_ip,
"date": result_dict["bot_status"][0]["date"],
}
)
checked_domain_dict = {}
# Report list of DNS query
query = reportDnsQuery(
self._db,
domain=domain_list,
resolver_ip=resolver_ip_list,
rdtype="A",
rdtype=["A", "MX"],
)
server_ip_dict = {}
result_dict["dns_query"] = []
for dns_change in query.dicts().iterator():
checked_domain_dict[dns_change["domain"]] = True
if dns_change["domain"] not in checked_domain_dict:
checked_domain_dict[dns_change["domain"]] = {}
checked_domain_dict[dns_change["domain"]][
dns_change["rdtype"]
] = dns_change["response"]
result_dict["dns_query"].append(
{
"domain": dns_change["domain"],
"rdtype": dns_change["rdtype"],
"resolver_ip": dns_change["resolver_ip"],
"date": rfc822(dns_change["status"]),
"response": dns_change["response"],
}
)
for server_ip in dns_change["response"].split(", "):
if not server_ip:
# drop empty response
continue
if server_ip not in server_ip_dict:
server_ip_dict[server_ip] = []
if dns_change["domain"] not in server_ip_dict[server_ip]:
server_ip_dict[server_ip].append(dns_change["domain"])
result_dict["missing_data"] = []
for resolver_ip in self.config["NAMESERVER"].split():
if resolver_ip not in checked_resolver_ip_dict:
result_dict["missing_data"].append(
{
"text": resolver_ip,
"date": result_dict["bot_status"][0]["date"],
}
)
for domain in domain_list:
if domain not in checked_domain_dict:
if domain in checked_domain_dict:
if "A" in checked_domain_dict[domain]:
for server_ip in checked_domain_dict[domain]["A"].split(
", "
):
if not server_ip:
# drop empty response
continue
if server_ip not in server_ip_dict:
server_ip_dict[server_ip] = []
if domain not in server_ip_dict[server_ip]:
server_ip_dict[server_ip].append(domain)
else:
result_dict["missing_data"].append(
{
"text": "(A) " + domain,
"date": result_dict["bot_status"][0]["date"],
}
)
if "MX" in checked_domain_dict[domain]:
if checked_domain_dict[domain]["MX"]:
for mx_domain in checked_domain_dict[domain][
"MX"
].split(", "):
if mx_domain not in checked_domain_dict:
result_dict["missing_data"].append(
{
"text": "(MX "
+ domain
+ ") "
+ mx_domain,
"date": result_dict["bot_status"][0][
"date"
],
}
)
else:
result_dict["missing_data"].append(
{
"text": "(MX) " + domain,
"date": result_dict["bot_status"][0]["date"],
}
)
else:
result_dict["missing_data"].append(
{
"text": domain,
......@@ -467,14 +554,14 @@ class WebBot:
# Report the list of CDN status
query = reportNetwork(
self._db,
port=["80", "443"],
port=["80", "443", "25"],
transport="TCP",
ip=[x for x in server_ip_dict.keys()],
)
url_dict = {}
result_dict["http_server"] = []
result_dict["tcp_server"] = []
for network_change in query.dicts().iterator():
result_dict["http_server"].append(
result_dict["tcp_server"].append(
{
"ip": network_change["ip"],
"state": network_change["state"],
......@@ -485,9 +572,9 @@ class WebBot:
)
if network_change["state"] == "open":
for hostname in server_ip_dict[network_change["ip"]]:
protocol = (
"http" if (network_change["port"] == 80) else "https"
)
protocol = {80: "http", 443: "https", 25: "smtp"}[
network_change["port"]
]
url = "%s://%s" % (protocol, hostname)
if url not in url_dict:
url_dict[url] = []
......@@ -553,7 +640,7 @@ class WebBot:
{
"text": "(%s ->) %s"
% (network_change["url"], redirect_url),
"date": result_dict["bot_status"][0]["date"],
"date": rfc822(network_change["status"]),
}
)
result_dict["http_query"].append(
......
......@@ -104,12 +104,16 @@ def buildResolver(resolver_ip, timeout):
def queryDNS(db, status_id, resolver_ip, domain_text, rdtype, timeout=TIMEOUT):
# only A (and AAAA) has address property
assert rdtype == "A"
assert rdtype in ["A", "MX"], rdtype
resolver = buildResolver(resolver_ip, timeout)
try:
answer_list = [
x.address
(
x.address
if (rdtype == "A")
else x.exchange.derelativize(domain_text).to_text()[:-1]
)
for x in resolver.query(
domain_text, rdtype, raise_on_no_answer=False
)
......@@ -121,6 +125,7 @@ def queryDNS(db, status_id, resolver_ip, domain_text, rdtype, timeout=TIMEOUT):
dns_resolver.NoNameservers,
):
answer_list = []
# how to differentiate no answer from empty answer
logDnsQuery(db, status_id, resolver_ip, domain_text, rdtype, answer_list)
return answer_list
......
......@@ -20,7 +20,7 @@
import unittest
from surykatka.bot import WebBot
import mock
from test_dns import MockAnswer
from test_dns import MockAnswerA, MockAnswerMX
from test_domain import MockAnswer as MockWhoisAnswer
import surykatka.dns
......@@ -61,8 +61,9 @@ def checkDnsChange(bot, result_list):
bot._db.DnsChange.select()
.order_by(bot._db.DnsChange.resolver_ip.asc())
.order_by(bot._db.DnsChange.domain.asc())
.order_by(bot._db.DnsChange.rdtype.asc())
)
db_result_list = [(x.resolver_ip, x.domain) for x in select_list]
db_result_list = [(x.resolver_ip, x.domain, x.rdtype) for x in select_list]
assert bot._db.DnsChange.select().count() == len(
result_list
), db_result_list
......@@ -94,7 +95,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_query:
mock_get_default_resolver.return_value = resolver
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [[MockAnswerA("1.2.3.4")]]
bot = WebBot(mapping={"SQLITE": ":memory:"})
bot.initDB()
bot.iterateLoop()
......@@ -106,7 +107,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkNetworkChange(bot, [(resolver_ip, 53)])
checkDnsChange(bot, [(resolver_ip, "example.org")])
checkDnsChange(bot, [(resolver_ip, "example.org", "A")])
checkSslChange(bot, [])
......@@ -154,7 +155,11 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_get_default_resolver.return_value = resolver
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
......@@ -173,16 +178,28 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2
assert mock_socket.call_count == 3
assert mock_query.call_count == 3
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
bot,
[
("1.2.3.4", 25),
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.4", 443),
],
)
checkDnsChange(bot, [(resolver_ip, "example.org")])
checkDnsChange(
bot,
[
(resolver_ip, "example.org", "A"),
(resolver_ip, "example.org", "MX"),
],
)
checkDomainChange(bot, ["example.org"])
......@@ -240,7 +257,14 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
......@@ -254,14 +278,15 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 4
assert mock_socket.call_count == 3
assert mock_query.call_count == 6
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
checkNetworkChange(
bot,
[
("1.2.3.4", 25),
(resolver_ip, 53),
(resolver_ip_2, 53),
("1.2.3.4", 80),
......@@ -272,7 +297,13 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDomainChange(bot, ["example.org"])
checkDnsChange(
bot, [(resolver_ip, "example.org"), (resolver_ip_2, "example.org")]
bot,
[
(resolver_ip, "example.org", "A"),
(resolver_ip_2, "example.org", "A"),
(resolver_ip, "example.org", "MX"),
(resolver_ip_2, "example.org", "MX"),
],
)
checkSslChange(bot, [("1.2.3.4", 443, "example.org")])
......@@ -330,7 +361,13 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
......@@ -351,18 +388,32 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_socket.call_count == 4
assert mock_query.call_count == 5
assert mock_socket.call_count == 5
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
bot,
[
("1.2.3.4", 25),
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.4", 443),
],
)
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain_1), (resolver_ip, domain_2)])
checkDnsChange(
bot,
[
(resolver_ip, domain_1, "A"),
(resolver_ip, domain_2, "A"),
(resolver_ip, domain_1, "MX"),
(resolver_ip, domain_2, "MX"),
],
)
checkSslChange(
bot, [("1.2.3.4", 443, domain_1), ("1.2.3.4", 443, domain_2)]
......@@ -422,9 +473,10 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [
MockAnswer("1.2.3.4"),
MockAnswer("1.2.3.5"),
mock_query.side_effect = [
[MockAnswerA("1.2.3.4"), MockAnswerA("1.2.3.5"),],
[MockAnswerA("1.2.3.4"), MockAnswerA("1.2.3.5"),],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -446,14 +498,16 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2
assert mock_socket.call_count == 6
assert mock_query.call_count == 3
assert mock_socket.call_count == 8
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4
checkNetworkChange(
bot,
[
("1.2.3.4", 25),
("1.2.3.5", 25),
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.5", 80),
......@@ -464,7 +518,9 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain)])
checkDnsChange(
bot, [(resolver_ip, domain, "A"), (resolver_ip, domain, "MX")]
)
checkSslChange(
bot,
......@@ -526,7 +582,13 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
......@@ -547,18 +609,32 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_socket.call_count == 4
assert mock_query.call_count == 5
assert mock_socket.call_count == 5
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
bot,
[
("1.2.3.4", 25),
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.4", 443),
],
)
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain), (resolver_ip, sub_domain)])
checkDnsChange(
bot,
[
(resolver_ip, domain, "A"),
(resolver_ip, sub_domain, "A"),
(resolver_ip, domain, "MX"),
(resolver_ip, sub_domain, "MX"),
],
)
checkSslChange(
bot, [("1.2.3.4", 443, domain), ("1.2.3.4", 443, sub_domain)]
......@@ -620,7 +696,11 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
......@@ -634,19 +714,30 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2
assert mock_socket.call_count == 3
assert mock_query.call_count == 3
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
bot,
[
("1.2.3.4", 25),
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.4", 443),
],
)
checkDomainChange(bot, ["foo.example.com"])
checkDnsChange(
bot, [(resolver_ip, "example.org"), (resolver_ip, sub_domain)]
bot,
[
(resolver_ip, "example.org", "A"),
(resolver_ip, sub_domain, "A"),
(resolver_ip, sub_domain, "MX"),
],
)
checkSslChange(bot, [("1.2.3.4", 443, sub_domain)])
......@@ -703,7 +794,11 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_query.side_effect = [
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
......@@ -716,18 +811,26 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2
assert mock_socket.call_count == 3
assert mock_query.call_count == 3
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 3
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
bot,
[
("1.2.3.4", 25),
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.4", 443),
],
)
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain)])
checkDnsChange(
bot, [(resolver_ip, domain, "A"), (resolver_ip, domain, "MX")]
)
checkSslChange(bot, [("1.2.3.4", 443, domain)])
......@@ -782,11 +885,11 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.side_effect = [[MockAnswer("1.2.3.4")], []]
mock_query.side_effect = [[MockAnswerA("1.2.3.4")], [], []]
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2
assert mock_query.call_count == 3
assert mock_socket.call_count == 0
assert mock_request.call_count == 0
......@@ -795,7 +898,12 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDomainChange(bot, ["example2.org"])
checkDnsChange(
bot, [(resolver_ip, domain), (resolver_ip, "example2.org")]
bot,
[
(resolver_ip, domain, "A"),
(resolver_ip, "example2.org", "A"),
(resolver_ip, "example2.org", "MX"),
],
)
checkHttpCodeChange(bot, [])
......@@ -834,7 +942,7 @@ class SurykatkaBotStatusTestCase(unittest.TestCase):
"missing_data": [
{"text": resolver_ip, "date": result["bot_status"][0]["date"]}
],
"http_server": [],
"tcp_server": [],
"ssl_certificate": [],
"http_query": [],
}
......@@ -869,7 +977,7 @@ class SurykatkaBotStatusTestCase(unittest.TestCase):
assert len(result["dns_server"]) == 0
assert len(result["whois"]) == 0
assert len(result["dns_query"]) == 0
assert len(result["http_server"]) == 0
assert len(result["tcp_server"]) == 0
assert len(result["ssl_certificate"]) == 0
assert len(result["http_query"]) == 0
# +1 for example.org
......
......@@ -32,13 +32,19 @@ from surykatka.dns import (
)
from surykatka.status import logStatus
import mock
from dns import name as dns_name
class MockAnswer(object):
class MockAnswerA(object):
def __init__(self, address):
self.address = address
class MockAnswerMX(object):
def __init__(self, label):
self.exchange = dns_name.Name(dns_name.from_text(label))
class SurykatkaDNSTestCase(unittest.TestCase):
def setUp(self):
self.db = LogDB(":memory:")
......@@ -253,7 +259,7 @@ class SurykatkaDNSTestCase(unittest.TestCase):
################################################
# queryDNS
################################################
def test_queryDNS_default(self):
def test_queryDNS_A(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
rdtype = "A"
......@@ -263,8 +269,8 @@ class SurykatkaDNSTestCase(unittest.TestCase):
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [
MockAnswer("4.3.2.1"),
MockAnswer("1.2.3.4"),
MockAnswerA("4.3.2.1"),
MockAnswerA("1.2.3.4"),
]
result = queryDNS(self.db, status_id, resolver_ip, domain, rdtype)
......@@ -281,6 +287,37 @@ class SurykatkaDNSTestCase(unittest.TestCase):
assert self.db.DnsChange.get().status_id == status_id
assert result == ["1.2.3.4", "4.3.2.1"]
def test_queryDNS_MX(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
rdtype = "MX"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [
MockAnswerMX("mail1.example.org"),
MockAnswerMX("mail2.example.org"),
]
result = queryDNS(self.db, status_id, resolver_ip, domain, rdtype)
assert mock_query.call_count == 1
mock_query.assert_called_with(
domain, rdtype, raise_on_no_answer=False
)
assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().resolver_ip == resolver_ip
assert self.db.DnsChange.get().domain == domain
assert self.db.DnsChange.get().rdtype == rdtype
assert (
self.db.DnsChange.get().response
== "mail1.example.org, mail2.example.org"
)
assert self.db.DnsChange.get().status_id == status_id
assert result == ["mail1.example.org", "mail2.example.org"]
def test_queryDNS_rejectRdtype(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
......@@ -423,8 +460,8 @@ class SurykatkaDNSTestCase(unittest.TestCase):
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [
MockAnswer("4.3.2.1"),
MockAnswer("1.2.3.4"),
MockAnswerA("4.3.2.1"),
MockAnswerA("1.2.3.4"),
]
result = getReachableResolverList(
self.db, status_id, [resolver_ip]
......@@ -502,8 +539,8 @@ class SurykatkaDNSTestCase(unittest.TestCase):
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [
MockAnswer("4.3.2.1"),
MockAnswer("1.2.3.4"),
MockAnswerA("4.3.2.1"),
MockAnswerA("1.2.3.4"),
]
result = getDomainIpDict(
self.db, status_id, resolver_ip_list, domain_list, rdtype
......@@ -531,8 +568,8 @@ class SurykatkaDNSTestCase(unittest.TestCase):
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.side_effect = [
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.4")],
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.5")],
[MockAnswerA("4.3.2.1"), MockAnswerA("1.2.3.4")],
[MockAnswerA("4.3.2.1"), MockAnswerA("1.2.3.5")],
]
result = getDomainIpDict(
self.db, status_id, resolver_ip_list, domain_list, rdtype
......@@ -564,8 +601,8 @@ class SurykatkaDNSTestCase(unittest.TestCase):
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.side_effect = [
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.4")],
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.5")],
[MockAnswerA("4.3.2.1"), MockAnswerA("1.2.3.4")],
[MockAnswerA("4.3.2.1"), MockAnswerA("1.2.3.5")],
]
result = getDomainIpDict(
self.db, status_id, resolver_ip_list, domain_list, rdtype
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment