Commit e3422797 authored by Romain Courteaud's avatar Romain Courteaud

Whois: handle connection error.

parent 9274f2be
...@@ -125,31 +125,36 @@ def queryWhois(db, status_id, domain_text): ...@@ -125,31 +125,36 @@ def queryWhois(db, status_id, domain_text):
# Error trying to connect to socket: closing socket # Error trying to connect to socket: closing socket
_stdout = sys.stdout _stdout = sys.stdout
sys.stdout = open(os.devnull, "w") sys.stdout = open(os.devnull, "w")
whois_dict = whois.whois(domain_text) try:
sys.stdout = _stdout whois_dict = whois.whois(domain_text)
except ConnectionResetError:
arg_list = [] arg_list = [""] * 16
for arg in [ whois_dict = {}
whois_dict.registrar, else:
whois_dict.whois_server, arg_list = []
whois_dict.creation_date, for arg in [
whois_dict.updated_date, whois_dict.registrar,
whois_dict.expiration_date, whois_dict.whois_server,
whois_dict.name_servers, whois_dict.creation_date,
whois_dict.status, whois_dict.updated_date,
whois_dict.emails, whois_dict.expiration_date,
whois_dict.dnssec, whois_dict.name_servers,
whois_dict.name, whois_dict.status,
whois_dict.org, whois_dict.emails,
whois_dict.address, whois_dict.dnssec,
whois_dict.city, whois_dict.name,
whois_dict.state, whois_dict.org,
whois_dict.zipcode, whois_dict.address,
whois_dict.country, whois_dict.city,
]: whois_dict.state,
if type(arg) == list: whois_dict.zipcode,
arg = arg[0] whois_dict.country,
arg_list.append(arg) ]:
if type(arg) == list:
arg = arg[0]
arg_list.append(arg)
finally:
sys.stdout = _stdout
logWhoisQuery(db, status_id, domain_text, *arg_list) logWhoisQuery(db, status_id, domain_text, *arg_list)
return whois_dict return whois_dict
...@@ -847,6 +847,43 @@ class SurykatkaDomainTestCase(unittest.TestCase): ...@@ -847,6 +847,43 @@ class SurykatkaDomainTestCase(unittest.TestCase):
assert self.db.DomainChange.get().status_id == status_id assert self.db.DomainChange.get().status_id == status_id
assert result == mock_answer assert result == mock_answer
def test_queryWhois_ConnectionResetError(self):
domain = "http://example.org"
status_id = logStatus(self.db, "foo")
with mock.patch("surykatka.domain.whois.whois") as mock_whois:
def sideEffect(*args, **kw):
raise ConnectionResetError()
mock_whois.side_effect = sideEffect
result = queryWhois(self.db, status_id, domain)
assert mock_whois.call_count == 1
mock_whois.assert_called_with(domain)
assert self.db.DomainChange.select().count() == 1
assert self.db.DomainChange.get().domain == domain
assert self.db.DomainChange.get().registrar == ""
assert self.db.DomainChange.get().whois_server == ""
assert self.db.DomainChange.get().creation_date == ""
assert self.db.DomainChange.get().updated_date == ""
assert self.db.DomainChange.get().expiration_date == ""
assert self.db.DomainChange.get().name_servers == ""
assert self.db.DomainChange.get().whois_status == ""
assert self.db.DomainChange.get().emails == ""
assert self.db.DomainChange.get().dnssec == ""
assert self.db.DomainChange.get().name == ""
assert self.db.DomainChange.get().org == ""
assert self.db.DomainChange.get().address == ""
assert self.db.DomainChange.get().city == ""
assert self.db.DomainChange.get().state == ""
assert self.db.DomainChange.get().zipcode == ""
assert self.db.DomainChange.get().country == ""
assert self.db.DomainChange.get().status_id == status_id
assert result == {}
################################################ ################################################
# packDns # packDns
################################################ ################################################
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment