Source code for pex.environment

# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import absolute_import, print_function

import itertools
import os
import site
import sys
import uuid

from pkg_resources import (
    DistributionNotFound,
    Environment,
    Requirement,
    WorkingSet,
    find_distributions
)

from .common import die, open_zip, rename_if_empty, safe_mkdir, safe_rmtree
from .interpreter import PythonInterpreter
from .package import distribution_compatible
from .pex_builder import PEXBuilder
from .pex_info import PexInfo
from .platforms import Platform
from .tracer import TRACER
from .util import CacheHelper, DistributionHelper


[docs]class PEXEnvironment(Environment): @classmethod def force_local(cls, pex, pex_info): if pex_info.code_hash is None: # Do not support force_local if code_hash is not set. (It should always be set.) return pex explode_dir = os.path.join(pex_info.zip_unsafe_cache, pex_info.code_hash) TRACER.log('PEX is not zip safe, exploding to %s' % explode_dir) if not os.path.exists(explode_dir): explode_tmp = explode_dir + '.' + uuid.uuid4().hex with TRACER.timed('Unzipping %s' % pex): try: safe_mkdir(explode_tmp) with open_zip(pex) as pex_zip: pex_files = (x for x in pex_zip.namelist() if not x.startswith(PEXBuilder.BOOTSTRAP_DIR) and not x.startswith(PexInfo.INTERNAL_CACHE)) pex_zip.extractall(explode_tmp, pex_files) except: # noqa: T803 safe_rmtree(explode_tmp) raise TRACER.log('Renaming %s to %s' % (explode_tmp, explode_dir)) rename_if_empty(explode_tmp, explode_dir) return explode_dir @classmethod def update_module_paths(cls, new_code_path): # Force subsequent imports to come from the .pex directory rather than the .pex file. TRACER.log('Adding to the head of sys.path: %s' % new_code_path) sys.path.insert(0, new_code_path) for name, module in sys.modules.items(): if hasattr(module, '__path__'): module_dir = os.path.join(new_code_path, *name.split(".")) TRACER.log('Adding to the head of %s.__path__: %s' % (module.__name__, module_dir)) try: module.__path__.insert(0, module_dir) except AttributeError: # TODO: This is a temporary bandaid for an unhandled AttributeError which results # in a startup crash. See https://github.com/pantsbuild/pex/issues/598 for more info. TRACER.log( 'Failed to insert %s: %s.__path__ of type %s does not support insertion!' % ( module_dir, module.__name__, type(module.__path__) ) ) @classmethod def write_zipped_internal_cache(cls, pex, pex_info): prefix_length = len(pex_info.internal_cache) + 1 existing_cached_distributions = [] newly_cached_distributions = [] zip_safe_distributions = [] with open_zip(pex) as zf: # Distribution names are the first element after ".deps/" and before the next "/" distribution_names = set(filter(None, (filename[prefix_length:].split('/')[0] for filename in zf.namelist() if filename.startswith(pex_info.internal_cache)))) # Create Distribution objects from these, and possibly write to disk if necessary. for distribution_name in distribution_names: internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name]) # First check if this is already cached dist_digest = pex_info.distributions.get(distribution_name) or CacheHelper.zip_hash( zf, internal_dist_path) cached_location = os.path.join(pex_info.install_cache, '%s.%s' % ( distribution_name, dist_digest)) if os.path.exists(cached_location): dist = DistributionHelper.distribution_from_path(cached_location) if dist is not None: existing_cached_distributions.append(dist) continue else: dist = DistributionHelper.distribution_from_path(os.path.join(pex, internal_dist_path)) if dist is not None: if DistributionHelper.zipsafe(dist) and not pex_info.always_write_cache: zip_safe_distributions.append(dist) continue with TRACER.timed('Caching %s' % dist): newly_cached_distributions.append( CacheHelper.cache_distribution(zf, internal_dist_path, cached_location)) return existing_cached_distributions, newly_cached_distributions, zip_safe_distributions
[docs] @classmethod def load_internal_cache(cls, pex, pex_info): """Possibly cache out the internal cache.""" internal_cache = os.path.join(pex, pex_info.internal_cache) with TRACER.timed('Searching dependency cache: %s' % internal_cache, V=2): if os.path.isdir(pex): for dist in find_distributions(internal_cache): yield dist else: for dist in itertools.chain(*cls.write_zipped_internal_cache(pex, pex_info)): yield dist
def __init__(self, pex, pex_info, interpreter=None, **kw): self._internal_cache = os.path.join(pex, pex_info.internal_cache) self._pex = pex self._pex_info = pex_info self._activated = False self._working_set = None self._interpreter = interpreter or PythonInterpreter.get() self._inherit_path = pex_info.inherit_path self._supported_tags = [] platform = Platform.current() platform_name = platform.platform super(PEXEnvironment, self).__init__( search_path=[] if pex_info.inherit_path == 'false' else sys.path, # NB: Our pkg_resources.Environment base-class wants the platform name string and not the # pex.platform.Platform object. platform=platform_name, **kw ) self._target_interpreter_env = self._interpreter.identity.pkg_resources_env(platform_name) self._supported_tags.extend(platform.supported_tags(self._interpreter)) TRACER.log( 'E: tags for %r x %r -> %s' % (self.platform, self._interpreter, self._supported_tags), V=9 ) def update_candidate_distributions(self, distribution_iter): for dist in distribution_iter: if self.can_add(dist): with TRACER.timed('Adding %s' % dist, V=2): self.add(dist)
[docs] def can_add(self, dist): return distribution_compatible(dist, self._supported_tags)
def activate(self): if not self._activated: with TRACER.timed('Activating PEX virtual environment from %s' % self._pex): self._working_set = self._activate() self._activated = True return self._working_set def _resolve(self, working_set, reqs): reqs = reqs[:] unresolved_reqs = set() resolveds = set() environment = self._target_interpreter_env.copy() environment['extra'] = list(set(itertools.chain(*(req.extras for req in reqs)))) # Resolve them one at a time so that we can figure out which ones we need to elide should # there be an interpreter incompatibility. for req in reqs: if req.marker and not req.marker.evaluate(environment=environment): TRACER.log('Skipping activation of `%s` due to environment marker de-selection' % req) continue with TRACER.timed('Resolving %s' % req, V=2): try: resolveds.update(working_set.resolve([req], env=self)) except DistributionNotFound as e: TRACER.log('Failed to resolve a requirement: %s' % e) unresolved_reqs.add(e.args[0].project_name) # Older versions of pkg_resources just call `DistributionNotFound(req)` instead of the # modern `DistributionNotFound(req, requirers)` and so we may not have the 2nd requirers # slot at all. if len(e.args) >= 2 and e.args[1]: unresolved_reqs.update(e.args[1]) unresolved_reqs = set([req.lower() for req in unresolved_reqs]) if unresolved_reqs: TRACER.log('Unresolved requirements:') for req in unresolved_reqs: TRACER.log(' - %s' % req) TRACER.log('Distributions contained within this pex:') if not self._pex_info.distributions: TRACER.log(' None') else: for dist in self._pex_info.distributions: TRACER.log(' - %s' % dist) if not self._pex_info.ignore_errors: die( 'Failed to execute PEX file, missing %s compatible dependencies for:\n%s' % ( Platform.current(), '\n'.join(str(r) for r in unresolved_reqs) ) ) return resolveds def _activate(self): self.update_candidate_distributions(self.load_internal_cache(self._pex, self._pex_info)) if not self._pex_info.zip_safe and os.path.isfile(self._pex): self.update_module_paths(self.force_local(self._pex, self._pex_info)) all_reqs = [Requirement.parse(req) for req in self._pex_info.requirements] working_set = WorkingSet([]) resolved = self._resolve(working_set, all_reqs) for dist in resolved: with TRACER.timed('Activating %s' % dist, V=2): working_set.add(dist) if os.path.isdir(dist.location): with TRACER.timed('Adding sitedir', V=2): if dist.location not in sys.path and self._inherit_path == "fallback": # Prepend location to sys.path. # This ensures that bundled versions of libraries will be used before system-installed # versions, in case something is installed in both, helping to favor hermeticity in # the case of non-hermetic PEX files (i.e. those with inherit_path=True). # # If the path is not already in sys.path, site.addsitedir will append (not prepend) # the path to sys.path. But if the path is already in sys.path, site.addsitedir will # leave sys.path unmodified, but will do everything else it would do. This is not part # of its advertised contract (which is very vague), but has been verified to be the # case by inspecting its source for both cpython 2.7 and cpython 3.7. sys.path.insert(0, dist.location) site.addsitedir(dist.location) dist.activate() return working_set