Commit 375138c7 authored by Paul Ganssle's avatar Paul Ganssle Committed by GitHub

Merge pull request #1576 from pganssle/fix_upload_metadata

Fix upload metadata
parents 38f1e490 2b5b9133
Started monkey-patching ``get_metadata_version`` and ``read_pkg_file`` onto ``distutils.DistributionMetadata`` to retain the correct version on the ``PKG-INFO`` file in the (deprecated) ``upload`` command.
import io
import os
import hashlib
import getpass
import platform
from base64 import standard_b64encode
from distutils import log
from distutils.command import upload as orig
from distutils.spawn import spawn
from distutils.errors import DistutilsError
from six.moves.urllib.request import urlopen, Request
from six.moves.urllib.error import HTTPError
from six.moves.urllib.parse import urlparse
class upload(orig.upload):
"""
Override default upload behavior to obtain password
in a variety of different ways.
"""
def run(self):
try:
orig.upload.run(self)
......@@ -33,6 +45,137 @@ class upload(orig.upload):
self._prompt_for_password()
)
def upload_file(self, command, pyversion, filename):
# Makes sure the repository URL is compliant
schema, netloc, url, params, query, fragments = \
urlparse(self.repository)
if params or query or fragments:
raise AssertionError("Incompatible url %s" % self.repository)
if schema not in ('http', 'https'):
raise AssertionError("unsupported schema " + schema)
# Sign if requested
if self.sign:
gpg_args = ["gpg", "--detach-sign", "-a", filename]
if self.identity:
gpg_args[2:2] = ["--local-user", self.identity]
spawn(gpg_args,
dry_run=self.dry_run)
# Fill in the data - send all the meta-data in case we need to
# register a new release
with open(filename, 'rb') as f:
content = f.read()
meta = self.distribution.metadata
data = {
# action
':action': 'file_upload',
'protocol_version': '1',
# identify release
'name': meta.get_name(),
'version': meta.get_version(),
# file content
'content': (os.path.basename(filename),content),
'filetype': command,
'pyversion': pyversion,
'md5_digest': hashlib.md5(content).hexdigest(),
# additional meta-data
'metadata_version': str(meta.get_metadata_version()),
'summary': meta.get_description(),
'home_page': meta.get_url(),
'author': meta.get_contact(),
'author_email': meta.get_contact_email(),
'license': meta.get_licence(),
'description': meta.get_long_description(),
'keywords': meta.get_keywords(),
'platform': meta.get_platforms(),
'classifiers': meta.get_classifiers(),
'download_url': meta.get_download_url(),
# PEP 314
'provides': meta.get_provides(),
'requires': meta.get_requires(),
'obsoletes': meta.get_obsoletes(),
}
data['comment'] = ''
if self.sign:
data['gpg_signature'] = (os.path.basename(filename) + ".asc",
open(filename+".asc", "rb").read())
# set up the authentication
user_pass = (self.username + ":" + self.password).encode('ascii')
# The exact encoding of the authentication string is debated.
# Anyway PyPI only accepts ascii for both username or password.
auth = "Basic " + standard_b64encode(user_pass).decode('ascii')
# Build up the MIME payload for the POST data
boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = b'\r\n--' + boundary.encode('ascii')
end_boundary = sep_boundary + b'--\r\n'
body = io.BytesIO()
for key, value in data.items():
title = '\r\nContent-Disposition: form-data; name="%s"' % key
# handle multiple entries for the same name
if not isinstance(value, list):
value = [value]
for value in value:
if type(value) is tuple:
title += '; filename="%s"' % value[0]
value = value[1]
else:
value = str(value).encode('utf-8')
body.write(sep_boundary)
body.write(title.encode('utf-8'))
body.write(b"\r\n\r\n")
body.write(value)
body.write(end_boundary)
body = body.getvalue()
msg = "Submitting %s to %s" % (filename, self.repository)
self.announce(msg, log.INFO)
# build the Request
headers = {
'Content-type': 'multipart/form-data; boundary=%s' % boundary,
'Content-length': str(len(body)),
'Authorization': auth,
}
request = Request(self.repository, data=body,
headers=headers)
# send the data
try:
result = urlopen(request)
status = result.getcode()
reason = result.msg
except HTTPError as e:
status = e.code
reason = e.msg
except OSError as e:
self.announce(str(e), log.ERROR)
raise
if status == 200:
self.announce('Server response (%s): %s' % (status, reason),
log.INFO)
if self.show_response:
text = getattr(self, '_read_pypi_response',
lambda x: None)(result)
if text is not None:
msg = '\n'.join(('-' * 75, text, '-' * 75))
self.announce(msg, log.INFO)
else:
msg = 'Upload failed (%s): %s' % (status, reason)
self.announce(msg, log.ERROR)
raise DistutilsError(msg)
def _load_password_from_keyring(self):
"""
Attempt to load password from keyring. Suppress Exceptions.
......
......@@ -10,7 +10,11 @@ import distutils.core
import distutils.cmd
import distutils.dist
import itertools
from collections import defaultdict
from email import message_from_file
from distutils.errors import (
DistutilsOptionError, DistutilsPlatformError, DistutilsSetupError,
)
......@@ -39,35 +43,103 @@ def _get_unpatched(cls):
return get_unpatched(cls)
def get_metadata_version(dist_md):
if dist_md.long_description_content_type or dist_md.provides_extras:
return StrictVersion('2.1')
elif (dist_md.maintainer is not None or
dist_md.maintainer_email is not None or
getattr(dist_md, 'python_requires', None) is not None):
return StrictVersion('1.2')
elif (dist_md.provides or dist_md.requires or dist_md.obsoletes or
dist_md.classifiers or dist_md.download_url):
return StrictVersion('1.1')
def get_metadata_version(self):
mv = getattr(self, 'metadata_version', None)
if mv is None:
if self.long_description_content_type or self.provides_extras:
mv = StrictVersion('2.1')
elif (self.maintainer is not None or
self.maintainer_email is not None or
getattr(self, 'python_requires', None) is not None):
mv = StrictVersion('1.2')
elif (self.provides or self.requires or self.obsoletes or
self.classifiers or self.download_url):
mv = StrictVersion('1.1')
else:
mv = StrictVersion('1.0')
self.metadata_version = mv
return mv
def read_pkg_file(self, file):
"""Reads the metadata values from a file object."""
msg = message_from_file(file)
def _read_field(name):
value = msg[name]
if value == 'UNKNOWN':
return None
return value
def _read_list(name):
values = msg.get_all(name, None)
if values == []:
return None
return values
self.metadata_version = StrictVersion(msg['metadata-version'])
self.name = _read_field('name')
self.version = _read_field('version')
self.description = _read_field('summary')
# we are filling author only.
self.author = _read_field('author')
self.maintainer = None
self.author_email = _read_field('author-email')
self.maintainer_email = None
self.url = _read_field('home-page')
self.license = _read_field('license')
if 'download-url' in msg:
self.download_url = _read_field('download-url')
else:
self.download_url = None
self.long_description = _read_field('description')
self.description = _read_field('summary')
return StrictVersion('1.0')
if 'keywords' in msg:
self.keywords = _read_field('keywords').split(',')
self.platforms = _read_list('platform')
self.classifiers = _read_list('classifier')
# PEP 314 - these fields only exist in 1.1
if self.metadata_version == StrictVersion('1.1'):
self.requires = _read_list('requires')
self.provides = _read_list('provides')
self.obsoletes = _read_list('obsoletes')
else:
self.requires = None
self.provides = None
self.obsoletes = None
# Based on Python 3.5 version
def write_pkg_file(self, file):
"""Write the PKG-INFO format data to a file object.
"""
version = get_metadata_version(self)
version = self.get_metadata_version()
if six.PY2:
def write_field(key, value):
file.write("%s: %s\n" % (key, self._encode_field(value)))
else:
def write_field(key, value):
file.write("%s: %s\n" % (key, value))
file.write('Metadata-Version: %s\n' % version)
file.write('Name: %s\n' % self.get_name())
file.write('Version: %s\n' % self.get_version())
file.write('Summary: %s\n' % self.get_description())
file.write('Home-page: %s\n' % self.get_url())
write_field('Metadata-Version', str(version))
write_field('Name', self.get_name())
write_field('Version', self.get_version())
write_field('Summary', self.get_description())
write_field('Home-page', self.get_url())
if version < StrictVersion('1.2'):
file.write('Author: %s\n' % self.get_contact())
file.write('Author-email: %s\n' % self.get_contact_email())
write_field('Author:', self.get_contact())
write_field('Author-email:', self.get_contact_email())
else:
optional_fields = (
('Author', 'author'),
......@@ -78,28 +150,26 @@ def write_pkg_file(self, file):
for field, attr in optional_fields:
attr_val = getattr(self, attr)
if six.PY2:
attr_val = self._encode_field(attr_val)
if attr_val is not None:
file.write('%s: %s\n' % (field, attr_val))
write_field(field, attr_val)
file.write('License: %s\n' % self.get_license())
write_field('License', self.get_license())
if self.download_url:
file.write('Download-URL: %s\n' % self.download_url)
write_field('Download-URL', self.download_url)
for project_url in self.project_urls.items():
file.write('Project-URL: %s, %s\n' % project_url)
write_field('Project-URL', '%s, %s' % project_url)
long_desc = rfc822_escape(self.get_long_description())
file.write('Description: %s\n' % long_desc)
write_field('Description', long_desc)
keywords = ','.join(self.get_keywords())
if keywords:
file.write('Keywords: %s\n' % keywords)
write_field('Keywords', keywords)
if version >= StrictVersion('1.2'):
for platform in self.get_platforms():
file.write('Platform: %s\n' % platform)
write_field('Platform', platform)
else:
self._write_list(file, 'Platform', self.get_platforms())
......@@ -112,17 +182,17 @@ def write_pkg_file(self, file):
# Setuptools specific for PEP 345
if hasattr(self, 'python_requires'):
file.write('Requires-Python: %s\n' % self.python_requires)
write_field('Requires-Python', self.python_requires)
# PEP 566
if self.long_description_content_type:
file.write(
'Description-Content-Type: %s\n' %
write_field(
'Description-Content-Type',
self.long_description_content_type
)
if self.provides_extras:
for extra in self.provides_extras:
file.write('Provides-Extra: %s\n' % extra)
write_field('Provides-Extra', extra)
sequence = tuple, list
......
......@@ -84,7 +84,7 @@ def patch_all():
warehouse = 'https://upload.pypi.org/legacy/'
distutils.config.PyPIRCCommand.DEFAULT_REPOSITORY = warehouse
_patch_distribution_metadata_write_pkg_file()
_patch_distribution_metadata()
# Install Distribution throughout the distutils
for module in distutils.dist, distutils.core, distutils.cmd:
......@@ -101,11 +101,11 @@ def patch_all():
patch_for_msvc_specialized_compiler()
def _patch_distribution_metadata_write_pkg_file():
"""Patch write_pkg_file to also write Requires-Python/Requires-External"""
distutils.dist.DistributionMetadata.write_pkg_file = (
setuptools.dist.write_pkg_file
)
def _patch_distribution_metadata():
"""Patch write_pkg_file and read_pkg_file for higher metadata standards"""
for attr in ('write_pkg_file', 'read_pkg_file', 'get_metadata_version'):
new_val = getattr(setuptools.dist, attr)
setattr(distutils.dist.DistributionMetadata, attr, new_val)
def patch_func(replacement, target_mod, func_name):
......
......@@ -12,6 +12,7 @@ from .textwrap import DALS
from .test_easy_install import make_nspkg_sdist
import pytest
import six
def test_dist_fetch_build_egg(tmpdir):
......@@ -59,6 +60,104 @@ def test_dist_fetch_build_egg(tmpdir):
def test_dist__get_unpatched_deprecated():
pytest.warns(DistDeprecationWarning, _get_unpatched, [""])
def __read_test_cases():
# Metadata version 1.0
base_attrs = {
"name": "package",
"version": "0.0.1",
"author": "Foo Bar",
"author_email": "foo@bar.net",
"long_description": "Long\ndescription",
"description": "Short description",
"keywords": ["one", "two"]
}
def merge_dicts(d1, d2):
d1 = d1.copy()
d1.update(d2)
return d1
test_cases = [
('Metadata version 1.0', base_attrs.copy()),
('Metadata version 1.1: Provides', merge_dicts(base_attrs, {
'provides': ['package']
})),
('Metadata version 1.1: Obsoletes', merge_dicts(base_attrs, {
'obsoletes': ['foo']
})),
('Metadata version 1.1: Classifiers', merge_dicts(base_attrs, {
'classifiers': [
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'License :: OSI Approved :: MIT License'
]})),
('Metadata version 1.1: Download URL', merge_dicts(base_attrs, {
'download_url': 'https://example.com'
})),
('Metadata Version 1.2: Requires-Python', merge_dicts(base_attrs, {
'python_requires': '>=3.7'
})),
pytest.param('Metadata Version 1.2: Project-Url',
merge_dicts(base_attrs, {
'project_urls': {
'Foo': 'https://example.bar'
}
}), marks=pytest.mark.xfail(
reason="Issue #1578: project_urls not read"
)),
('Metadata Version 2.1: Long Description Content Type',
merge_dicts(base_attrs, {
'long_description_content_type': 'text/x-rst; charset=UTF-8'
})),
pytest.param('Metadata Version 2.1: Provides Extra',
merge_dicts(base_attrs, {
'provides_extras': ['foo', 'bar']
}), marks=pytest.mark.xfail(reason="provides_extras not read")),
]
return test_cases
@pytest.mark.parametrize('name,attrs', __read_test_cases())
def test_read_metadata(name, attrs):
dist = Distribution(attrs)
metadata_out = dist.metadata
dist_class = metadata_out.__class__
# Write to PKG_INFO and then load into a new metadata object
if six.PY2:
PKG_INFO = io.BytesIO()
else:
PKG_INFO = io.StringIO()
metadata_out.write_pkg_file(PKG_INFO)
PKG_INFO.seek(0)
metadata_in = dist_class()
metadata_in.read_pkg_file(PKG_INFO)
tested_attrs = [
('name', dist_class.get_name),
('version', dist_class.get_version),
('metadata_version', dist_class.get_metadata_version),
('provides', dist_class.get_provides),
('description', dist_class.get_description),
('download_url', dist_class.get_download_url),
('keywords', dist_class.get_keywords),
('platforms', dist_class.get_platforms),
('obsoletes', dist_class.get_obsoletes),
('requires', dist_class.get_requires),
('classifiers', dist_class.get_classifiers),
('project_urls', lambda s: getattr(s, 'project_urls', {})),
('provides_extras', lambda s: getattr(s, 'provides_extras', set())),
]
for attr, getter in tested_attrs:
assert getter(metadata_in) == getter(metadata_out)
def __maintainer_test_cases():
attrs = {"name": "package",
"version": "1.0",
......
import mock
import os
import re
from distutils import log
from distutils.errors import DistutilsError
from distutils.version import StrictVersion
import pytest
import six
from setuptools.command.upload import upload
from setuptools.dist import Distribution
def _parse_upload_body(body):
boundary = u'\r\n----------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
entries = []
name_re = re.compile(u'^Content-Disposition: form-data; name="([^\"]+)"')
for entry in body.split(boundary):
pair = entry.split(u'\r\n\r\n')
if not len(pair) == 2:
continue
key, value = map(six.text_type.strip, pair)
m = name_re.match(key)
if m is not None:
key = m.group(1)
entries.append((key, value))
return entries
@pytest.fixture
def patched_upload(tmpdir):
class Fix:
def __init__(self, cmd, urlopen):
self.cmd = cmd
self.urlopen = urlopen
def __iter__(self):
return iter((self.cmd, self.urlopen))
def get_uploaded_metadata(self):
request = self.urlopen.call_args_list[0][0][0]
body = request.data.decode('utf-8')
entries = dict(_parse_upload_body(body))
return entries
class ResponseMock(mock.Mock):
def getheader(self, name, default=None):
"""Mocked getheader method for response object"""
return {
'content-type': 'text/plain; charset=utf-8',
}.get(name.lower(), default)
with mock.patch('setuptools.command.upload.urlopen') as urlopen:
urlopen.return_value = ResponseMock()
urlopen.return_value.getcode.return_value = 200
urlopen.return_value.read.return_value = b''
content = os.path.join(str(tmpdir), "content_data")
with open(content, 'w') as f:
f.write("Some content")
dist = Distribution()
dist.dist_files = [('sdist', '3.7.0', content)]
cmd = upload(dist)
cmd.announce = mock.Mock()
cmd.username = 'user'
cmd.password = 'hunter2'
yield Fix(cmd, urlopen)
class TestUploadTest:
def test_upload_metadata(self, patched_upload):
cmd, patch = patched_upload
# Set the metadata version to 2.1
cmd.distribution.metadata.metadata_version = '2.1'
# Run the command
cmd.ensure_finalized()
cmd.run()
# Make sure we did the upload
patch.assert_called_once()
# Make sure the metadata version is correct in the headers
entries = patched_upload.get_uploaded_metadata()
assert entries['metadata_version'] == '2.1'
def test_warns_deprecation(self):
dist = Distribution()
dist.dist_files = [(mock.Mock(), mock.Mock(), mock.Mock())]
......@@ -41,3 +129,86 @@ class TestUploadTest:
"upload instead (https://pypi.org/p/twine/)",
log.WARN
)
@pytest.mark.parametrize('url', [
'https://example.com/a;parameter', # Has parameters
'https://example.com/a?query', # Has query
'https://example.com/a#fragment', # Has fragment
'ftp://example.com', # Invalid scheme
])
def test_upload_file_invalid_url(self, url, patched_upload):
patched_upload.urlopen.side_effect = Exception("Should not be reached")
cmd = patched_upload.cmd
cmd.repository = url
cmd.ensure_finalized()
with pytest.raises(AssertionError):
cmd.run()
def test_upload_file_http_error(self, patched_upload):
patched_upload.urlopen.side_effect = six.moves.urllib.error.HTTPError(
'https://example.com',
404,
'File not found',
None,
None
)
cmd = patched_upload.cmd
cmd.ensure_finalized()
with pytest.raises(DistutilsError):
cmd.run()
cmd.announce.assert_any_call(
'Upload failed (404): File not found',
log.ERROR)
def test_upload_file_os_error(self, patched_upload):
patched_upload.urlopen.side_effect = OSError("Invalid")
cmd = patched_upload.cmd
cmd.ensure_finalized()
with pytest.raises(OSError):
cmd.run()
cmd.announce.assert_any_call('Invalid', log.ERROR)
@mock.patch('setuptools.command.upload.spawn')
def test_upload_file_gpg(self, spawn, patched_upload):
cmd, urlopen = patched_upload
cmd.sign = True
cmd.identity = "Alice"
cmd.dry_run = True
content_fname = cmd.distribution.dist_files[0][2]
signed_file = content_fname + '.asc'
with open(signed_file, 'wb') as f:
f.write("signed-data".encode('utf-8'))
cmd.ensure_finalized()
cmd.run()
# Make sure that GPG was called
spawn.assert_called_once_with([
"gpg", "--detach-sign", "--local-user", "Alice", "-a",
content_fname
], dry_run=True)
# Read the 'signed' data that was transmitted
entries = patched_upload.get_uploaded_metadata()
assert entries['gpg_signature'] == 'signed-data'
def test_show_response_no_error(self, patched_upload):
# This test is just that show_response doesn't throw an error
# It is not really important what the printed response looks like
# in a deprecated command, but we don't want to introduce new
# errors when importing this function from distutils
patched_upload.cmd.show_response = True
patched_upload.cmd.ensure_finalized()
patched_upload.cmd.run()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment