Commit cd52bc3c authored by Łukasz Nowak's avatar Łukasz Nowak

recurlests: Initial version

parents
# Copyright (C) 2023 Nexedi SA
# Lukasz Nowak <luke@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from setuptools import setup, find_packages
# with io.open("src/surykatka/bot.py", "rt", encoding="utf8") as f:
# version = re.search(r'__version__ = "(.*?)"', f.read()).group(1)
setup(
name='recurlests',
version='0.0.0',
author='Lukasz Nowak',
author_email='luke@nexedi.com',
description="Recurlests - requests like curl wrapper",
long_description=__doc__,
classifiers=[
'Environment :: Console',
'Environment :: Web Environment',
'Intended Audience :: System Administrators',
'Intended Audience :: Information Technology',
'License :: OSI Approved :: GNU General Public License v3 or '
'later (GPLv3+)',
],
keywords='requests url curl',
url='https://lab.nexedi.com/luke/recurlests',
license='GPLv3+ with wide exception for FOSS',
install_requires=[
'requests', # for the great CaseInsensitiveDict
],
zip_safe=True,
python_requires=">=3.5",
packages=find_packages("src"),
package_dir={"": "src"},
entry_points={
'console_scripts': [
'recurl = recurlests.cli:recurl',
]
},
)
from . import recurlests
def recurl():
mimikra = recurlests.Recurlests()
mimikra.get('https://www.nexedi.com/')
import json
import tempfile
import subprocess
from requests.structures import CaseInsensitiveDict
import gzip
import os
class RecurlestsResponse(object):
# properties:
# content Content of the response, in bytes.
# cookies A CookieJar of Cookies the server sent back.
# history A list of Response objects from the history of the Request. Any
# redirect responses will end up here. The list is sorted from the
# oldest to the most recent request.
# is_permanent_redirect True if this Response one of the permanent versions
# of redirect.
# is_redirect True if this Response is a well-formed HTTP redirect that
# could have been processed automatically (by
# Session.resolve_redirects).
# links Returns the parsed header links of the response, if any.
# next Returns a PreparedRequest for the next request in a redirect chain,
# if there is one.
# reason Textual reason of responded HTTP Status, e.g. “Not Found” or “OK”.
def json(self, *args, **kwargs):
return json.loads(self.text, *args, **kwargs)
class Recurlests(object):
"""curl command wrapper, mimicing requests"""
def __init__(self, curl='curl'):
self.curl = curl
def get(self, url, **kwargs):
return self.request('GET', url, **kwargs)
def put(self, url, **kwargs):
return self.request('PUT', url, **kwargs)
def request(
self,
method,
url,
http3=True,
http3_only=False,
resolve_all=None,
verify=True,
allow_redirects=True,
headers=None,
auth=None,
timeout=None,
source_ip=None,
data=None
):
try:
alt_svc = tempfile.NamedTemporaryFile(delete=False).name
request_header_file = tempfile.NamedTemporaryFile(delete=False).name
response_header_file = tempfile.NamedTemporaryFile(delete=False).name
response_file = tempfile.NamedTemporaryFile(delete=False).name
hsts_file = tempfile.NamedTemporaryFile(delete=False).name
command_list = [
self.curl,
'--disable',
'--globoff',
'--path-as-is',
'--no-progress-meter',
'--dump-header', response_header_file,
'--hsts', hsts_file,
'--output', response_file,
'--alt-svc', alt_svc,
'--request', method,
'--write-out', '%{json}'
]
if data is not None:
command_list.extend(['--data-binary', data])
if allow_redirects:
command_list.append('--location')
command_list.extend(['--max-redirs', '100'])
if source_ip is not None:
command_list.extend(['--interface', source_ip])
if not verify:
command_list.append('--insecure')
elif isinstance(verify, str):
command_list.extend(['--cacert', verify])
if headers is None:
headers = {}
with open(request_header_file, 'w') as fh:
for header, value in headers.items():
fh.write('%s: %s' % (header, value))
command_list.extend(['--header', '@%s' % (request_header_file,)])
if auth is not None:
command_list.extend(['--user', '%s:%s' % auth])
if resolve_all is not None:
for port, ip in resolve_all.items():
command_list.extend(['--resolve', '*:%s:%s' % (port, ip)])
if http3_only:
command_list.append('--http3-only')
if timeout is None:
# forcibly set timeout to some nice value to detect 2nd case of HTTP3
# unavailable
timeout = 2
elif http3 and url.startswith('https://'):
command_list.append('--http3')
if timeout is not None:
if isinstance(timeout, int) or isinstance(timeout, float):
command_list.extend(['--max-time', str(timeout)])
else:
command_list.exetend([
'--connect-timeout', str(timeout[0]),
'--max-time', str(timeout[1])])
command_list.append(url)
prc = subprocess.Popen(
command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
response = RecurlestsResponse()
response.command_output, response.command_error = [
q.decode() for q in prc.communicate()]
response.command_returncode = prc.returncode
try:
response.write_out_json = json.loads(response.command_output)
except Exception:
raise
with open(response_header_file) as fh:
response.header_text = fh.read()
response.headers = CaseInsensitiveDict()
for line in response.header_text.splitlines()[1:]:
if line.strip():
header, value = line.split(':', 1)
value = value.strip()
response.headers.setdefault(header, [])
response.headers[header].append(value)
for header in response.headers.keys():
response.headers[header] = ', '.join(response.headers[header])
if response.headers.get('content-encoding') == 'gzip':
with gzip.GzipFile(response_file) as fh:
response.text = fh.read().decode()
else:
with open(response_file) as fh:
response.text = fh.read()
response.certificate_list = []
response.certificate = None
if 'certs' in response.write_out_json:
in_cert = False
cert_list = []
for line in response.write_out_json['certs'].splitlines():
if line == '-----END CERTIFICATE-----':
in_cert = False
cert_list.append(line)
cert_list.append('') # add the newline as it is expected
response.certificate_list.append('\n'.join(cert_list).encode())
cert_list = []
elif line == '-----BEGIN CERTIFICATE-----':
in_cert = True
if in_cert:
cert_list.append(line)
if len(response.certificate_list):
response.certificate = response.certificate_list[0]
response.effective_http_version = response.write_out_json['http_version']
response.status_code = int(response.write_out_json['http_code'])
if response.status_code < 400 and response.status_code != 0:
response.ok = True
else:
response.ok = False
response.content_type = response.write_out_json['content_type']
response.method = response.write_out_json['method']
response.response_code = response.write_out_json['response_code']
response.speed_download = response.write_out_json['speed_download']
response.speed_upload = response.write_out_json['speed_upload']
response.time_appconnect = response.write_out_json['time_appconnect']
response.time_connect = response.write_out_json['time_connect']
response.time_namelookup = response.write_out_json['time_namelookup']
response.time_pretransfer = response.write_out_json['time_pretransfer']
response.time_redirect = response.write_out_json['time_redirect']
response.time_starttransfer = response.write_out_json[
'time_starttransfer']
response.time_total = response.write_out_json['time_total']
response.elapsed = float(response.time_total)
response.url = response.write_out_json['url']
response.url_effective = response.write_out_json['url_effective']
response.urlnum = response.write_out_json['urlnum']
response.curl_version = response.write_out_json['curl_version']
response.command_list = command_list
return response
finally:
os.unlink(alt_svc)
os.unlink(response_header_file)
os.unlink(request_header_file)
os.unlink(response_file)
os.unlink(hsts_file)
# kwargs:
# params - Dictionary, list of tuples or bytes to send in the query
# string for the Request.
# data - Dictionary, list of tuples, bytes, or file-like object to send
# in the body of the Request.
# json - A JSON serializable Python object to send in the body of the
# Request.
# cookies - Dict or CookieJar object to send with the Request.
# files - Dictionary of 'name': file-like-objects (or {'name':
# proxies - Dictionary mapping protocol to the URL of the proxy.
# stream - if False, the response content will be immediately downloaded.
# cert - if String, path to ssl client cert file (.pem). If Tuple,
# ('cert', 'key') pair.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment