Commit 072a67f9 authored by Romain Courteaud's avatar Romain Courteaud

MORE TESTS

parent 3aad5b21
import unittest import unittest
from urlchecker_db import LogDB from urlchecker_db import LogDB
import peewee import peewee
from urlchecker_dns import expandDomainList from urlchecker_dns import expandDomainList, logDnsQuery
from urlchecker_status import logStatus
class UrlCheckerDNSTestCase(unittest.TestCase): class UrlCheckerDNSTestCase(unittest.TestCase):
...@@ -16,6 +17,105 @@ class UrlCheckerDNSTestCase(unittest.TestCase): ...@@ -16,6 +17,105 @@ class UrlCheckerDNSTestCase(unittest.TestCase):
result = expandDomainList(["c", "b.a.a", "a.a.a", "a.a"]) result = expandDomainList(["c", "b.a.a", "a.a.a", "a.a"])
assert result == ["a.a", "a.a.a", "b.a.a", "c"] assert result == ["a.a", "a.a.a", "b.a.a", "c"]
################################################
# logDnsQuery
################################################
def test_logDnsQuery_insertFirst(self):
domain = "http://example.org"
resolver_ip = "127.0.0.1"
rdtype = 'foo'
answer_list = ['4.3.2.1', '1.2.3.4']
status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list)
assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().resolver_ip == resolver_ip
assert self.db.DnsChange.get().domain == domain
assert self.db.DnsChange.get().rdtype == rdtype
assert self.db.DnsChange.get().response == '1.2.3.4, 4.3.2.1'
assert self.db.DnsChange.get().status_id == status_id
assert result == status_id
def test_logDnsQuery_insertOnlyOnePerStatusIdIPUrl(self):
domain = "http://example.org"
resolver_ip = "127.0.0.1"
rdtype = 'foo'
answer_list = ['4.3.2.1', '1.2.3.4']
status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list)
answer_list.extend('127.0.0.1')
try:
logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list)
except peewee.IntegrityError:
assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().status_id == status_id
else:
raise NotImplementedError("Expected IntegrityError")
def test_logDnsQuery_skipIdenticalPreviousValues(self):
domain = "http://example.org"
resolver_ip = "127.0.0.1"
rdtype = 'foo'
answer_list = ['4.3.2.1', '1.2.3.4']
status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list)
status_id_2 = logStatus(self.db, "foo")
result_2 = logDnsQuery(self.db, status_id_2, resolver_ip, domain, rdtype, answer_list)
assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().resolver_ip == resolver_ip
assert self.db.DnsChange.get().domain == domain
assert self.db.DnsChange.get().rdtype == rdtype
assert self.db.DnsChange.get().response == '1.2.3.4, 4.3.2.1'
assert self.db.DnsChange.get().status_id == status_id
assert result == status_id
assert result == result_2
def test_logDnsQuery_insertWhenDifferentAnswer(self):
domain = "http://example.org"
resolver_ip = "127.0.0.1"
rdtype = 'foo'
answer_list = ['4.3.2.1', '1.2.3.4']
status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list)
answer_list_2 = ['4.3.2.1', '1.2.3.4', '0.0.0.0']
status_id_2 = logStatus(self.db, "foo")
result_2 = logDnsQuery(self.db, status_id_2, resolver_ip, domain, rdtype, answer_list_2)
assert result_2 != result
assert self.db.DnsChange.select().count() == 2
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).resolver_ip == resolver_ip
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).domain == domain
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).rdtype == rdtype
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).response == '1.2.3.4, 4.3.2.1'
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).status_id == status_id
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).resolver_ip == resolver_ip
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).domain == domain
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).rdtype == rdtype
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).response == '0.0.0.0, 1.2.3.4, 4.3.2.1'
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).status_id == status_id_2
def test_logDnsQuery_insertDifferentUrl(self):
domain = "http://example.org"
domain_2 = domain + '.'
resolver_ip = "127.0.0.1"
resolver_ip_2 = resolver_ip + '1'
rdtype = 'foo'
rdtype_2 = rdtype + 'bar'
answer_list = ['4.3.2.1', '1.2.3.4']
status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list)
logDnsQuery(self.db, status_id, resolver_ip_2, domain, rdtype, answer_list)
logDnsQuery(self.db, status_id, resolver_ip, domain_2, rdtype, answer_list)
logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype_2, answer_list)
logDnsQuery(self.db, status_id, resolver_ip_2, domain_2, rdtype, answer_list)
logDnsQuery(self.db, status_id, resolver_ip_2, domain, rdtype_2, answer_list)
logDnsQuery(self.db, status_id, resolver_ip, domain_2, rdtype_2, answer_list)
logDnsQuery(self.db, status_id, resolver_ip_2, domain_2, rdtype_2, answer_list)
assert self.db.DnsChange.select().count() == 8
def suite(): def suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
...@@ -55,11 +55,16 @@ class LogDB: ...@@ -55,11 +55,16 @@ class LogDB:
class DnsChange(BaseModel): class DnsChange(BaseModel):
status = peewee.ForeignKeyField(Status) status = peewee.ForeignKeyField(Status)
resolver_ip = peewee.TextField(index=True) resolver_ip = peewee.TextField()
domain = peewee.TextField(index=True) domain = peewee.TextField()
rdtype = peewee.TextField() rdtype = peewee.TextField()
response = peewee.TextField() response = peewee.TextField()
class Meta:
primary_key = peewee.CompositeKey(
"status", "resolver_ip", "domain", "rdtype"
)
class HttpCodeChange(BaseModel): class HttpCodeChange(BaseModel):
status = peewee.ForeignKeyField(Status) status = peewee.ForeignKeyField(Status)
ip = peewee.TextField() ip = peewee.TextField()
......
...@@ -7,25 +7,7 @@ URL_TO_CHECK = "example.org" ...@@ -7,25 +7,7 @@ URL_TO_CHECK = "example.org"
TIMEOUT = 2 TIMEOUT = 2
def logDnsQuery(db, status_id, resolver_ip, resolver, domain_text, rdtype): def logDnsQuery(db, status_id, resolver_ip, domain_text, rdtype, answer_list):
# only A (and AAAA) has address property
assert rdtype == "A"
try:
answer_list = [
x.address
for x in resolver.query(
domain_text, rdtype, raise_on_no_answer=False
)
]
except (
dns.resolver.NXDOMAIN,
dns.resolver.NoAnswer,
dns.exception.Timeout,
dns.resolver.NoNameservers,
):
answer_list = []
answer_list.sort() answer_list.sort()
response = ", ".join(answer_list) response = ", ".join(answer_list)
...@@ -54,6 +36,30 @@ def logDnsQuery(db, status_id, resolver_ip, resolver, domain_text, rdtype): ...@@ -54,6 +36,30 @@ def logDnsQuery(db, status_id, resolver_ip, resolver, domain_text, rdtype):
status=status_id, status=status_id,
) )
return previous_entry.status_id
def queryDNS(db, status_id, resolver_ip, resolver, domain_text, rdtype):
# only A (and AAAA) has address property
assert rdtype == "A"
try:
answer_list = [
x.address
for x in resolver.query(
domain_text, rdtype, raise_on_no_answer=False
)
]
except (
dns.resolver.NXDOMAIN,
dns.resolver.NoAnswer,
dns.exception.Timeout,
dns.resolver.NoNameservers,
):
answer_list = []
logDNSQuery(db, status_id, resolver_ip, domain_text, rdtype, answer_list)
return answer_list return answer_list
...@@ -76,7 +82,7 @@ def getResolverDict(db, status_id, resolver_ip_list): ...@@ -76,7 +82,7 @@ def getResolverDict(db, status_id, resolver_ip_list):
resolver_tuple_list = [x for x in resolver_dict.items()] resolver_tuple_list = [x for x in resolver_dict.items()]
for ip, resolver in resolver_tuple_list: for ip, resolver in resolver_tuple_list:
resolver_state = "open" resolver_state = "open"
answer_list = logDnsQuery( answer_list = queryDNS(
db, status_id, ip, resolver, URL_TO_CHECK, "A" db, status_id, ip, resolver, URL_TO_CHECK, "A"
) )
...@@ -106,7 +112,7 @@ def getServerIpDict(db, status_id, resolver_dict, domain_list, rdtype): ...@@ -106,7 +112,7 @@ def getServerIpDict(db, status_id, resolver_dict, domain_list, rdtype):
server_ip_dict = {} server_ip_dict = {}
for domain_text in domain_list: for domain_text in domain_list:
for resolver_ip, resolver in resolver_dict.items(): for resolver_ip, resolver in resolver_dict.items():
answer_list = logDnsQuery( answer_list = queryDNS(
db, status_id, resolver_ip, resolver, domain_text, rdtype db, status_id, resolver_ip, resolver, domain_text, rdtype
) )
for address in answer_list: for address in answer_list:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment