Commit a25dd45a authored by Romain Courteaud's avatar Romain Courteaud

Add PUBLIC_SUFFIX parameter

Allow to skip some top domains like .co.uk
parent fe0adc9f
...@@ -76,7 +76,10 @@ class WebBot: ...@@ -76,7 +76,10 @@ class WebBot:
domain_list = list(set(domain_list)) domain_list = list(set(domain_list))
# Expand with all parent domains # Expand with all parent domains
return expandDomainList(domain_list) return expandDomainList(
domain_list,
public_suffix_list=self.config["PUBLIC_SUFFIX"].split(),
)
def iterateLoop(self): def iterateLoop(self):
status_id = logStatus(self._db, "loop") status_id = logStatus(self._db, "loop")
......
...@@ -48,6 +48,8 @@ def createConfiguration( ...@@ -48,6 +48,8 @@ def createConfiguration(
get_default_resolver().nameservers get_default_resolver().nameservers
) )
if "PUBLIC_SUFFIX" not in config[CONFIG_SECTION]:
config[CONFIG_SECTION]["PUBLIC_SUFFIX"] = ""
if "DOMAIN" not in config[CONFIG_SECTION]: if "DOMAIN" not in config[CONFIG_SECTION]:
config[CONFIG_SECTION]["DOMAIN"] = "" config[CONFIG_SECTION]["DOMAIN"] = ""
if "URL" not in config[CONFIG_SECTION]: if "URL" not in config[CONFIG_SECTION]:
......
...@@ -133,12 +133,17 @@ def getReachableResolverList(db, status_id, resolver_ip_list, timeout=TIMEOUT): ...@@ -133,12 +133,17 @@ def getReachableResolverList(db, status_id, resolver_ip_list, timeout=TIMEOUT):
return result_ip_list return result_ip_list
def expandDomainList(domain_list): def expandDomainList(domain_list, public_suffix_list=None):
for domain_text in domain_list: for domain_text in domain_list:
dns_name = dns.name.from_text(domain_text) dns_name = dns.name.from_text(domain_text)
if (len(dns_name.labels) - 1) > 2: if (len(dns_name.labels) - 1) > 2:
domain_list.append(dns_name.parent().to_text(omit_final_dot=True)) # https://publicsuffix.org/list/public_suffix_list.dat
parent_domain_text = dns_name.parent().to_text(omit_final_dot=True)
if (public_suffix_list is None) or (
parent_domain_text not in public_suffix_list
):
domain_list.append(parent_domain_text)
domain_list = list(set(domain_list)) domain_list = list(set(domain_list))
domain_list.sort() domain_list.sort()
......
...@@ -56,13 +56,16 @@ def checkSslChange(bot, result_list): ...@@ -56,13 +56,16 @@ def checkSslChange(bot, result_list):
def checkDnsChange(bot, result_list): def checkDnsChange(bot, result_list):
assert bot._db.DnsChange.select().count() == len(result_list)
select_list = ( select_list = (
bot._db.DnsChange.select() bot._db.DnsChange.select()
.order_by(bot._db.DnsChange.resolver_ip.asc()) .order_by(bot._db.DnsChange.resolver_ip.asc())
.order_by(bot._db.DnsChange.domain.asc()) .order_by(bot._db.DnsChange.domain.asc())
) )
assert [(x.resolver_ip, x.domain) for x in select_list] == result_list db_result_list = [(x.resolver_ip, x.domain) for x in select_list]
assert bot._db.DnsChange.select().count() == len(
result_list
), db_result_list
assert db_result_list == result_list
class SurykatkaBotTestCase(unittest.TestCase): class SurykatkaBotTestCase(unittest.TestCase):
...@@ -442,6 +445,68 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -442,6 +445,68 @@ class SurykatkaBotTestCase(unittest.TestCase):
], ],
) )
def test_oneNameserverOneSubDomainOneIpOnePublicSuffix(self):
resolver_ip = "127.0.0.1"
domain = "example.com"
sub_domain = "foo.%s" % domain
bot = WebBot(
mapping={
"SQLITE": ":memory:",
"DOMAIN": sub_domain,
"NAMESERVER": resolver_ip,
"PUBLIC_SUFFIX": domain,
}
)
bot.initDB()
with mock.patch(
"surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"surykatka.network.socket.socket"
) as mock_socket, mock.patch(
"surykatka.ssl.ssl.create_default_context"
) as mock_create_default_context, mock.patch(
"surykatka.http.request"
) as mock_request:
mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
b"",
{
"notBefore": "Jan 27 04:33:22 2020 GMT",
"notAfter": "Jan 27 04:33:22 2020 GMT",
"subject": [[("commonName", "foo")]],
"issuer": [[("commonName", "bar")]],
},
]
bot.iterateLoop()
assert mock_query.call_count == 2
assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1
assert mock_request.call_count == 2
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
)
checkDnsChange(
bot, [(resolver_ip, "example.org"), (resolver_ip, sub_domain)]
)
checkSslChange(bot, [("1.2.3.4", 443, sub_domain)])
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://%s" % sub_domain),
("1.2.3.4", "https://%s" % sub_domain),
],
)
def test_oneNameserverOneUrlOneIp(self): def test_oneNameserverOneUrlOneIp(self):
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
domain = "example.org" domain = "example.org"
......
...@@ -50,6 +50,20 @@ class SurykatkaDNSTestCase(unittest.TestCase): ...@@ -50,6 +50,20 @@ class SurykatkaDNSTestCase(unittest.TestCase):
result = expandDomainList(["c", "b.a.a", "a.a.a", "a.a"]) result = expandDomainList(["c", "b.a.a", "a.a.a", "a.a"])
assert result == ["a.a", "a.a.a", "b.a.a", "c"] assert result == ["a.a", "a.a.a", "b.a.a", "c"]
def test_expandDomainList_skipPublicSuffix(self):
result = expandDomainList(["doo.foo.bar.co.uk"])
assert result == [
"bar.co.uk",
"co.uk",
"doo.foo.bar.co.uk",
"foo.bar.co.uk",
]
result = expandDomainList(
["doo.foo.bar.co.uk"], public_suffix_list=["bar.co.uk"]
)
assert result == ["doo.foo.bar.co.uk", "foo.bar.co.uk"]
################################################ ################################################
# logDnsQuery # logDnsQuery
################################################ ################################################
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment