database.py 23.5 KB
Newer Older
1 2 3 4 5 6 7
"""PEP 376 implementation."""

import os
import re
import csv
import sys
import zipimport
8
from io import StringIO
9
from hashlib import md5
10

11 12 13 14 15 16 17 18 19 20 21
from packaging import logger
from packaging.errors import PackagingError
from packaging.version import suggest_normalized_version, VersionPredicate
from packaging.metadata import Metadata


__all__ = [
    'Distribution', 'EggInfoDistribution', 'distinfo_dirname',
    'get_distributions', 'get_distribution', 'get_file_users',
    'provides_distribution', 'obsoletes_distribution',
    'enable_cache', 'disable_cache', 'clear_cache',
22
    # XXX these functions' names look like get_file_users but are not related
Tarek Ziade's avatar
Tarek Ziade committed
23
    'get_file_path', 'get_file']
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65


# TODO update docs

DIST_FILES = ('INSTALLER', 'METADATA', 'RECORD', 'REQUESTED', 'RESOURCES')

# Cache
_cache_name = {}  # maps names to Distribution instances
_cache_name_egg = {}  # maps names to EggInfoDistribution instances
_cache_path = {}  # maps paths to Distribution instances
_cache_path_egg = {}  # maps paths to EggInfoDistribution instances
_cache_generated = False  # indicates if .dist-info distributions are cached
_cache_generated_egg = False  # indicates if .dist-info and .egg are cached
_cache_enabled = True


def enable_cache():
    """
    Enables the internal cache.

    Note that this function will not clear the cache in any case, for that
    functionality see :func:`clear_cache`.
    """
    global _cache_enabled

    _cache_enabled = True


def disable_cache():
    """
    Disables the internal cache.

    Note that this function will not clear the cache in any case, for that
    functionality see :func:`clear_cache`.
    """
    global _cache_enabled

    _cache_enabled = False


def clear_cache():
    """ Clears the internal cache. """
66
    global _cache_generated, _cache_generated_egg
67

68 69 70 71
    _cache_name.clear()
    _cache_name_egg.clear()
    _cache_path.clear()
    _cache_path_egg.clear()
72 73 74 75
    _cache_generated = False
    _cache_generated_egg = False


76
def _yield_distributions(include_dist, include_egg, paths):
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
    """
    Yield .dist-info and .egg(-info) distributions, based on the arguments

    :parameter include_dist: yield .dist-info distributions
    :parameter include_egg: yield .egg(-info) distributions
    """
    for path in paths:
        realpath = os.path.realpath(path)
        if not os.path.isdir(realpath):
            continue
        for dir in os.listdir(realpath):
            dist_path = os.path.join(realpath, dir)
            if include_dist and dir.endswith('.dist-info'):
                yield Distribution(dist_path)
            elif include_egg and (dir.endswith('.egg-info') or
                                  dir.endswith('.egg')):
                yield EggInfoDistribution(dist_path)


96
def _generate_cache(use_egg_info, paths):
97 98 99 100 101 102 103 104 105 106 107
    global _cache_generated, _cache_generated_egg

    if _cache_generated_egg or (_cache_generated and not use_egg_info):
        return
    else:
        gen_dist = not _cache_generated
        gen_egg = use_egg_info

        for dist in _yield_distributions(gen_dist, gen_egg, paths):
            if isinstance(dist, Distribution):
                _cache_path[dist.path] = dist
108
                if dist.name not in _cache_name:
109 110 111 112
                    _cache_name[dist.name] = []
                _cache_name[dist.name].append(dist)
            else:
                _cache_path_egg[dist.path] = dist
113
                if dist.name not in _cache_name_egg:
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
                    _cache_name_egg[dist.name] = []
                _cache_name_egg[dist.name].append(dist)

        if gen_dist:
            _cache_generated = True
        if gen_egg:
            _cache_generated_egg = True


class Distribution:
    """Created with the *path* of the ``.dist-info`` directory provided to the
    constructor. It reads the metadata contained in ``METADATA`` when it is
    instantiated."""

    name = ''
    """The name of the distribution."""

    version = ''
    """The version of the distribution."""

    metadata = None
    """A :class:`packaging.metadata.Metadata` instance loaded with
    the distribution's ``METADATA`` file."""

    requested = False
    """A boolean that indicates whether the ``REQUESTED`` metadata file is
    present (in other words, whether the package was installed by user
    request or it was installed as a dependency)."""

    def __init__(self, path):
        if _cache_enabled and path in _cache_path:
            self.metadata = _cache_path[path].metadata
        else:
            metadata_path = os.path.join(path, 'METADATA')
            self.metadata = Metadata(path=metadata_path)

        self.name = self.metadata['Name']
        self.version = self.metadata['Version']
        self.path = path

154
        if _cache_enabled and path not in _cache_path:
155 156 157 158 159 160 161
            _cache_path[path] = self

    def __repr__(self):
        return '<Distribution %r %s at %r>' % (
            self.name, self.version, self.path)

    def _get_records(self, local=False):
162
        results = []
163
        with self.get_distinfo_file('RECORD') as record:
164 165
            record_reader = csv.reader(record, delimiter=',',
                                       lineterminator='\n')
166
            for row in record_reader:
167 168
                missing = [None for i in range(len(row), 3)]
                path, checksum, size = row + missing
169 170 171
                if local:
                    path = path.replace('/', os.sep)
                    path = os.path.join(sys.prefix, path)
172 173
                results.append((path, checksum, size))
        return results
174 175 176

    def get_resource_path(self, relative_path):
        with self.get_distinfo_file('RESOURCES') as resources_file:
177
            resources_reader = csv.reader(resources_file, delimiter=',',
178
                                          lineterminator='\n')
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
            for relative, destination in resources_reader:
                if relative == relative_path:
                    return destination
        raise KeyError(
            'no resource file with relative path %r is installed' %
            relative_path)

    def list_installed_files(self, local=False):
        """
        Iterates over the ``RECORD`` entries and returns a tuple
        ``(path, md5, size)`` for each line. If *local* is ``True``,
        the returned path is transformed into a local absolute path.
        Otherwise the raw value from RECORD is returned.

        A local absolute path is an absolute path in which occurrences of
        ``'/'`` have been replaced by the system separator given by ``os.sep``.

Éric Araujo's avatar
Éric Araujo committed
196
        :parameter local: flag to say if the path should be returned as a local
197 198 199 200 201
                          absolute path

        :type local: boolean
        :returns: iterator of (path, md5, size)
        """
202 203
        for result in self._get_records(local):
            yield result
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267

    def uses(self, path):
        """
        Returns ``True`` if path is listed in ``RECORD``. *path* can be a local
        absolute path or a relative ``'/'``-separated path.

        :rtype: boolean
        """
        for p, checksum, size in self._get_records():
            local_absolute = os.path.join(sys.prefix, p)
            if path == p or path == local_absolute:
                return True
        return False

    def get_distinfo_file(self, path, binary=False):
        """
        Returns a file located under the ``.dist-info`` directory. Returns a
        ``file`` instance for the file pointed by *path*.

        :parameter path: a ``'/'``-separated path relative to the
                         ``.dist-info`` directory or an absolute path;
                         If *path* is an absolute path and doesn't start
                         with the ``.dist-info`` directory path,
                         a :class:`PackagingError` is raised
        :type path: string
        :parameter binary: If *binary* is ``True``, opens the file in read-only
                           binary mode (``rb``), otherwise opens it in
                           read-only mode (``r``).
        :rtype: file object
        """
        open_flags = 'r'
        if binary:
            open_flags += 'b'

        # Check if it is an absolute path  # XXX use relpath, add tests
        if path.find(os.sep) >= 0:
            # it's an absolute path?
            distinfo_dirname, path = path.split(os.sep)[-2:]
            if distinfo_dirname != self.path.split(os.sep)[-1]:
                raise PackagingError(
                    'dist-info file %r does not belong to the %r %s '
                    'distribution' % (path, self.name, self.version))

        # The file must be relative
        if path not in DIST_FILES:
            raise PackagingError('invalid path for a dist-info file: %r' %
                                 path)

        path = os.path.join(self.path, path)
        return open(path, open_flags)

    def list_distinfo_files(self, local=False):
        """
        Iterates over the ``RECORD`` entries and returns paths for each line if
        the path is pointing to a file located in the ``.dist-info`` directory
        or one of its subdirectories.

        :parameter local: If *local* is ``True``, each returned path is
                          transformed into a local absolute path. Otherwise the
                          raw value from ``RECORD`` is returned.
        :type local: boolean
        :returns: iterator of paths
        """
        for path, checksum, size in self._get_records(local):
268 269 270
            # XXX add separator or use real relpath algo
            if path.startswith(self.path):
                yield path
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

    def __eq__(self, other):
        return isinstance(other, Distribution) and self.path == other.path

    # See http://docs.python.org/reference/datamodel#object.__hash__
    __hash__ = object.__hash__


class EggInfoDistribution:
    """Created with the *path* of the ``.egg-info`` directory or file provided
    to the constructor. It reads the metadata contained in the file itself, or
    if the given path happens to be a directory, the metadata is read from the
    file ``PKG-INFO`` under that directory."""

    name = ''
    """The name of the distribution."""

    version = ''
    """The version of the distribution."""

    metadata = None
    """A :class:`packaging.metadata.Metadata` instance loaded with
    the distribution's ``METADATA`` file."""

    _REQUIREMENT = re.compile(
        r'(?P<name>[-A-Za-z0-9_.]+)\s*'
        r'(?P<first>(?:<|<=|!=|==|>=|>)[-A-Za-z0-9_.]+)?\s*'
        r'(?P<rest>(?:\s*,\s*(?:<|<=|!=|==|>=|>)[-A-Za-z0-9_.]+)*)\s*'
        r'(?P<extras>\[.*\])?')

    def __init__(self, path):
        self.path = path
        if _cache_enabled and path in _cache_path_egg:
            self.metadata = _cache_path_egg[path].metadata
            self.name = self.metadata['Name']
            self.version = self.metadata['Version']
            return

        # reused from Distribute's pkg_resources
        def yield_lines(strs):
            """Yield non-empty/non-comment lines of a ``basestring``
            or sequence"""
            if isinstance(strs, str):
                for s in strs.splitlines():
                    s = s.strip()
                    # skip blank lines/comments
                    if s and not s.startswith('#'):
                        yield s
            else:
                for ss in strs:
                    for s in yield_lines(ss):
                        yield s

        requires = None

        if path.endswith('.egg'):
            if os.path.isdir(path):
                meta_path = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
                self.metadata = Metadata(path=meta_path)
                try:
                    req_path = os.path.join(path, 'EGG-INFO', 'requires.txt')
                    with open(req_path, 'r') as fp:
                        requires = fp.read()
                except IOError:
                    requires = None
            else:
                # FIXME handle the case where zipfile is not available
                zipf = zipimport.zipimporter(path)
339
                fileobj = StringIO(
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
                    zipf.get_data('EGG-INFO/PKG-INFO').decode('utf8'))
                self.metadata = Metadata(fileobj=fileobj)
                try:
                    requires = zipf.get_data('EGG-INFO/requires.txt')
                except IOError:
                    requires = None
            self.name = self.metadata['Name']
            self.version = self.metadata['Version']

        elif path.endswith('.egg-info'):
            if os.path.isdir(path):
                path = os.path.join(path, 'PKG-INFO')
                try:
                    with open(os.path.join(path, 'requires.txt'), 'r') as fp:
                        requires = fp.read()
                except IOError:
                    requires = None
            self.metadata = Metadata(path=path)
Éric Araujo's avatar
Éric Araujo committed
358
            self.name = self.metadata['Name']
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
            self.version = self.metadata['Version']

        else:
            raise ValueError('path must end with .egg-info or .egg, got %r' %
                             path)

        if requires is not None:
            if self.metadata['Metadata-Version'] == '1.1':
                # we can't have 1.1 metadata *and* Setuptools requires
                for field in ('Obsoletes', 'Requires', 'Provides'):
                    del self.metadata[field]

        reqs = []

        if requires is not None:
            for line in yield_lines(requires):
                if line.startswith('['):
                    logger.warning(
                        'extensions in requires.txt are not supported '
                        '(used by %r %s)', self.name, self.version)
                    break
                else:
                    match = self._REQUIREMENT.match(line.strip())
                    if not match:
                        # this happens when we encounter extras; since they
                        # are written at the end of the file we just exit
                        break
                    else:
                        if match.group('extras'):
                            msg = ('extra requirements are not supported '
                                   '(used by %r %s)', self.name, self.version)
                            logger.warning(msg, self.name)
                        name = match.group('name')
                        version = None
                        if match.group('first'):
                            version = match.group('first')
                            if match.group('rest'):
                                version += match.group('rest')
                            version = version.replace(' ', '')  # trim spaces
                        if version is None:
                            reqs.append(name)
                        else:
                            reqs.append('%s (%s)' % (name, version))

            if len(reqs) > 0:
                self.metadata['Requires-Dist'] += reqs

        if _cache_enabled:
            _cache_path_egg[self.path] = self

    def __repr__(self):
        return '<EggInfoDistribution %r %s at %r>' % (
            self.name, self.version, self.path)

    def list_installed_files(self, local=False):

        def _md5(path):
            with open(path, 'rb') as f:
                content = f.read()
            return md5(content).hexdigest()

        def _size(path):
            return os.stat(path).st_size

        path = self.path
        if local:
            path = path.replace('/', os.sep)

        # XXX What about scripts and data files ?
        if os.path.isfile(path):
            return [(path, _md5(path), _size(path))]
        else:
            files = []
            for root, dir, files_ in os.walk(path):
                for item in files_:
                    item = os.path.join(root, item)
                    files.append((item, _md5(item), _size(item)))
            return files

        return []

    def uses(self, path):
        return False

    def __eq__(self, other):
        return (isinstance(other, EggInfoDistribution) and
                self.path == other.path)

    # See http://docs.python.org/reference/datamodel#object.__hash__
    __hash__ = object.__hash__


def distinfo_dirname(name, version):
    """
    The *name* and *version* parameters are converted into their
    filename-escaped form, i.e. any ``'-'`` characters are replaced
    with ``'_'`` other than the one in ``'dist-info'`` and the one
    separating the name from the version number.

    :parameter name: is converted to a standard distribution name by replacing
                     any runs of non- alphanumeric characters with a single
                     ``'-'``.
    :type name: string
    :parameter version: is converted to a standard version string. Spaces
                        become dots, and all other non-alphanumeric characters
                        (except dots) become dashes, with runs of multiple
                        dashes condensed to a single dash.
    :type version: string
    :returns: directory name
    :rtype: string"""
    file_extension = '.dist-info'
    name = name.replace('-', '_')
    normalized_version = suggest_normalized_version(version)
    # Because this is a lookup procedure, something will be returned even if
    #   it is a version that cannot be normalized
    if normalized_version is None:
        # Unable to achieve normality?
        normalized_version = version
    return '-'.join([name, normalized_version]) + file_extension


480
def get_distributions(use_egg_info=False, paths=None):
481 482 483 484 485 486 487 488 489
    """
    Provides an iterator that looks for ``.dist-info`` directories in
    ``sys.path`` and returns :class:`Distribution` instances for each one of
    them. If the parameters *use_egg_info* is ``True``, then the ``.egg-info``
    files and directores are iterated as well.

    :rtype: iterator of :class:`Distribution` and :class:`EggInfoDistribution`
            instances
    """
490 491 492
    if paths is None:
        paths = sys.path

493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
    if not _cache_enabled:
        for dist in _yield_distributions(True, use_egg_info, paths):
            yield dist
    else:
        _generate_cache(use_egg_info, paths)

        for dist in _cache_path.values():
            yield dist

        if use_egg_info:
            for dist in _cache_path_egg.values():
                yield dist


def get_distribution(name, use_egg_info=False, paths=None):
    """
    Scans all elements in ``sys.path`` and looks for all directories
    ending with ``.dist-info``. Returns a :class:`Distribution`
    corresponding to the ``.dist-info`` directory that contains the
    ``METADATA`` that matches *name* for the *name* metadata field.
    If no distribution exists with the given *name* and the parameter
    *use_egg_info* is set to ``True``, then all files and directories ending
    with ``.egg-info`` are scanned. A :class:`EggInfoDistribution` instance is
    returned if one is found that has metadata that matches *name* for the
    *name* metadata field.

    This function only returns the first result found, as no more than one
    value is expected. If the directory is not found, ``None`` is returned.

    :rtype: :class:`Distribution` or :class:`EggInfoDistribution` or None
    """
524
    if paths is None:
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
        paths = sys.path

    if not _cache_enabled:
        for dist in _yield_distributions(True, use_egg_info, paths):
            if dist.name == name:
                return dist
    else:
        _generate_cache(use_egg_info, paths)

        if name in _cache_name:
            return _cache_name[name][0]
        elif use_egg_info and name in _cache_name_egg:
            return _cache_name_egg[name][0]
        else:
            return None


def obsoletes_distribution(name, version=None, use_egg_info=False):
    """
    Iterates over all distributions to find which distributions obsolete
    *name*.

    If a *version* is provided, it will be used to filter the results.
    If the argument *use_egg_info* is set to ``True``, then ``.egg-info``
    distributions will be considered as well.

    :type name: string
    :type version: string
    :parameter name:
    """
    for dist in get_distributions(use_egg_info):
        obsoleted = (dist.metadata['Obsoletes-Dist'] +
                     dist.metadata['Obsoletes'])
        for obs in obsoleted:
            o_components = obs.split(' ', 1)
            if len(o_components) == 1 or version is None:
                if name == o_components[0]:
                    yield dist
                    break
            else:
                try:
                    predicate = VersionPredicate(obs)
                except ValueError:
                    raise PackagingError(
                        'distribution %r has ill-formed obsoletes field: '
                        '%r' % (dist.name, obs))
                if name == o_components[0] and predicate.match(version):
                    yield dist
                    break


def provides_distribution(name, version=None, use_egg_info=False):
    """
    Iterates over all distributions to find which distributions provide *name*.
    If a *version* is provided, it will be used to filter the results. Scans
    all elements in ``sys.path``  and looks for all directories ending with
    ``.dist-info``. Returns a :class:`Distribution`  corresponding to the
    ``.dist-info`` directory that contains a ``METADATA`` that matches *name*
    for the name metadata. If the argument *use_egg_info* is set to ``True``,
    then all files and directories ending with ``.egg-info`` are considered
    as well and returns an :class:`EggInfoDistribution` instance.

    This function only returns the first result found, since no more than
    one values are expected. If the directory is not found, returns ``None``.

    :parameter version: a version specifier that indicates the version
                        required, conforming to the format in ``PEP-345``

    :type name: string
    :type version: string
    """
    predicate = None
    if not version is None:
        try:
            predicate = VersionPredicate(name + ' (' + version + ')')
        except ValueError:
            raise PackagingError('invalid name or version: %r, %r' %
                                 (name, version))

    for dist in get_distributions(use_egg_info):
        provided = dist.metadata['Provides-Dist'] + dist.metadata['Provides']

        for p in provided:
            p_components = p.rsplit(' ', 1)
            if len(p_components) == 1 or predicate is None:
                if name == p_components[0]:
                    yield dist
                    break
            else:
                p_name, p_ver = p_components
                if len(p_ver) < 2 or p_ver[0] != '(' or p_ver[-1] != ')':
                    raise PackagingError(
                        'distribution %r has invalid Provides field: %r' %
                        (dist.name, p))
                p_ver = p_ver[1:-1]  # trim off the parenthesis
                if p_name == name and predicate.match(p_ver):
                    yield dist
                    break


def get_file_users(path):
    """
    Iterates over all distributions to find out which distributions use
    *path*.

    :parameter path: can be a local absolute path or a relative
                     ``'/'``-separated path.
    :type path: string
    :rtype: iterator of :class:`Distribution` instances
    """
    for dist in get_distributions():
        if dist.uses(path):
            yield dist
638 639 640 641 642


def get_file_path(distribution_name, relative_path):
    """Return the path to a resource file."""
    dist = get_distribution(distribution_name)
643
    if dist is not None:
644 645 646 647 648 649 650 651
        return dist.get_resource_path(relative_path)
    raise LookupError('no distribution named %r found' % distribution_name)


def get_file(distribution_name, relative_path, *args, **kwargs):
    """Open and return a resource file."""
    return open(get_file_path(distribution_name, relative_path),
                *args, **kwargs)