Commit bb6fb890 authored by Romain Courteaud's avatar Romain Courteaud

Check domain dns TXT entry

parent 77772997
......@@ -128,8 +128,9 @@ def filterWarningStatus(
for i in range(len(status_dict["dns_query"]) - 1, -1, -1):
state = status_dict["dns_query"][i]["response"]
if state == "":
if status_dict["dns_query"][i]["rdtype"] == "MX":
# No MX is allowed
if status_dict["dns_query"][i]["rdtype"] in ("MX", "TXT"):
# No MX/TXT is allowed
# XXX report empty SPF!
del status_dict["dns_query"][i]
else:
# Keep track of possible domain handling MX
......@@ -321,6 +322,10 @@ class WebBot:
getDomainIpDict(
self._db, status_id, resolver_ip_list, domain_list, "MX", timeout
)
# Check the mail configuration for every domain (MX and SPF)
getDomainIpDict(
self._db, status_id, resolver_ip_list, domain_list, "TXT", timeout
)
# Check TCP port for the list of IP found
# XXX For now, check http/https only
......@@ -475,7 +480,7 @@ class WebBot:
self._db,
domain=domain_list,
resolver_ip=resolver_ip_list,
rdtype=["A", "MX"],
rdtype=["A", "MX", "TXT"],
)
server_ip_dict = {}
result_dict["dns_query"] = []
......@@ -543,6 +548,14 @@ class WebBot:
}
)
if "TXT" not in checked_domain_dict[domain]:
result_dict["missing_data"].append(
{
"text": "(TXT) " + domain,
"date": result_dict["bot_status"][0]["date"],
}
)
else:
result_dict["missing_data"].append(
{
......
......@@ -104,7 +104,7 @@ def buildResolver(resolver_ip, timeout):
def queryDNS(db, status_id, resolver_ip, domain_text, rdtype, timeout=TIMEOUT):
# only A (and AAAA) has address property
assert rdtype in ["A", "MX"], rdtype
assert rdtype in ["A", "MX", "TXT"], rdtype
resolver = buildResolver(resolver_ip, timeout)
try:
......@@ -112,7 +112,11 @@ def queryDNS(db, status_id, resolver_ip, domain_text, rdtype, timeout=TIMEOUT):
(
x.address
if (rdtype == "A")
else x.exchange.derelativize(domain_text).to_text()[:-1]
else (
x.exchange.derelativize(domain_text).to_text()[:-1]
if (rdtype == "MX")
else x.to_text()
)
)
for x in resolver.query(
domain_text, rdtype, raise_on_no_answer=False
......
......@@ -20,7 +20,7 @@
import unittest
from surykatka.bot import WebBot
import mock
from test_dns import MockAnswerA, MockAnswerMX
from test_dns import MockAnswerA, MockAnswerMX, MockAnswerTXT
from test_domain import MockAnswer as MockWhoisAnswer
import surykatka.dns
......@@ -159,6 +159,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -178,7 +179,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_query.call_count == 4
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
......@@ -198,6 +199,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
[
(resolver_ip, "example.org", "A"),
(resolver_ip, "example.org", "MX"),
(resolver_ip, "example.org", "TXT"),
],
)
......@@ -264,6 +266,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerMX("")],
[MockAnswerTXT("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -278,7 +282,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 6
assert mock_query.call_count == 8
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
......@@ -303,6 +307,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
(resolver_ip_2, "example.org", "A"),
(resolver_ip, "example.org", "MX"),
(resolver_ip_2, "example.org", "MX"),
(resolver_ip, "example.org", "TXT"),
(resolver_ip_2, "example.org", "TXT"),
],
)
......@@ -367,6 +373,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerMX("")],
[MockAnswerTXT("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -388,7 +396,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 5
assert mock_query.call_count == 7
assert mock_socket.call_count == 5
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4
......@@ -412,6 +420,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
(resolver_ip, domain_2, "A"),
(resolver_ip, domain_1, "MX"),
(resolver_ip, domain_2, "MX"),
(resolver_ip, domain_1, "TXT"),
(resolver_ip, domain_2, "TXT"),
],
)
......@@ -477,6 +487,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4"), MockAnswerA("1.2.3.5"),],
[MockAnswerA("1.2.3.4"), MockAnswerA("1.2.3.5"),],
[MockAnswerMX("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -498,7 +509,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_query.call_count == 4
assert mock_socket.call_count == 8
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4
......@@ -519,7 +530,12 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDomainChange(bot, ["example.org"])
checkDnsChange(
bot, [(resolver_ip, domain, "A"), (resolver_ip, domain, "MX")]
bot,
[
(resolver_ip, domain, "A"),
(resolver_ip, domain, "MX"),
(resolver_ip, domain, "TXT"),
],
)
checkSslChange(
......@@ -588,6 +604,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerMX("")],
[MockAnswerTXT("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -609,7 +627,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 5
assert mock_query.call_count == 7
assert mock_socket.call_count == 5
assert mock_create_default_context.call_count == 2
assert mock_request.call_count == 4
......@@ -633,6 +651,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
(resolver_ip, sub_domain, "A"),
(resolver_ip, domain, "MX"),
(resolver_ip, sub_domain, "MX"),
(resolver_ip, domain, "TXT"),
(resolver_ip, sub_domain, "TXT"),
],
)
......@@ -700,6 +720,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -714,7 +735,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_query.call_count == 4
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
......@@ -737,6 +758,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
(resolver_ip, "example.org", "A"),
(resolver_ip, sub_domain, "A"),
(resolver_ip, sub_domain, "MX"),
(resolver_ip, sub_domain, "TXT"),
],
)
......@@ -798,6 +820,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
[MockAnswerA("1.2.3.4")],
[MockAnswerA("1.2.3.4")],
[MockAnswerMX("")],
[MockAnswerTXT("")],
]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
......@@ -811,7 +834,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_query.call_count == 4
assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 3
......@@ -829,7 +852,12 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDomainChange(bot, ["example.org"])
checkDnsChange(
bot, [(resolver_ip, domain, "A"), (resolver_ip, domain, "MX")]
bot,
[
(resolver_ip, domain, "A"),
(resolver_ip, domain, "MX"),
(resolver_ip, domain, "TXT"),
],
)
checkSslChange(bot, [("1.2.3.4", 443, domain)])
......@@ -885,11 +913,11 @@ class SurykatkaBotTestCase(unittest.TestCase):
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.side_effect = [[MockAnswerA("1.2.3.4")], [], []]
mock_query.side_effect = [[MockAnswerA("1.2.3.4")], [], [], []]
bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3
assert mock_query.call_count == 4
assert mock_socket.call_count == 0
assert mock_request.call_count == 0
......@@ -903,6 +931,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
(resolver_ip, domain, "A"),
(resolver_ip, "example2.org", "A"),
(resolver_ip, "example2.org", "MX"),
(resolver_ip, "example2.org", "TXT"),
],
)
......
......@@ -45,6 +45,14 @@ class MockAnswerMX(object):
self.exchange = dns_name.Name(dns_name.from_text(label))
class MockAnswerTXT(object):
def __init__(self, text):
self.text = text
def to_text(self):
return self.text
class SurykatkaDNSTestCase(unittest.TestCase):
def setUp(self):
self.db = LogDB(":memory:")
......@@ -318,6 +326,34 @@ class SurykatkaDNSTestCase(unittest.TestCase):
assert self.db.DnsChange.get().status_id == status_id
assert result == ["mail1.example.org", "mail2.example.org"]
def test_queryDNS_TXT(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
rdtype = "TXT"
status_id = logStatus(self.db, "foo")
with mock.patch(
"surykatka.dns.dns_resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [
MockAnswerTXT('"foo=bar"'),
MockAnswerTXT('"v=spf1 -all"'),
]
result = queryDNS(self.db, status_id, resolver_ip, domain, rdtype)
assert mock_query.call_count == 1
mock_query.assert_called_with(
domain, rdtype, raise_on_no_answer=False
)
assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().resolver_ip == resolver_ip
assert self.db.DnsChange.get().domain == domain
assert self.db.DnsChange.get().rdtype == rdtype
assert self.db.DnsChange.get().response == '"foo=bar", "v=spf1 -all"'
assert self.db.DnsChange.get().status_id == status_id
assert result == ['"foo=bar"', '"v=spf1 -all"']
def test_queryDNS_rejectRdtype(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment