294 lines
9.3 KiB
Python
294 lines
9.3 KiB
Python
|
"""Stuff that differs in different Python versions and platform
|
||
|
distributions."""
|
||
|
|
||
|
# The following comment should be removed at some point in the future.
|
||
|
# mypy: disallow-untyped-defs=False
|
||
|
|
||
|
from __future__ import absolute_import, division
|
||
|
|
||
|
import codecs
|
||
|
import functools
|
||
|
import locale
|
||
|
import logging
|
||
|
import os
|
||
|
import shutil
|
||
|
import sys
|
||
|
|
||
|
from pip._vendor.six import PY2, text_type
|
||
|
|
||
|
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
|
||
|
|
||
|
if MYPY_CHECK_RUNNING:
|
||
|
from typing import Callable, Optional, Protocol, Text, Tuple, TypeVar, Union
|
||
|
|
||
|
# Used in the @lru_cache polyfill.
|
||
|
F = TypeVar('F')
|
||
|
|
||
|
class LruCache(Protocol):
|
||
|
def __call__(self, maxsize=None):
|
||
|
# type: (Optional[int]) -> Callable[[F], F]
|
||
|
raise NotImplementedError
|
||
|
|
||
|
try:
|
||
|
import ipaddress
|
||
|
except ImportError:
|
||
|
try:
|
||
|
from pip._vendor import ipaddress # type: ignore
|
||
|
except ImportError:
|
||
|
import ipaddr as ipaddress # type: ignore
|
||
|
ipaddress.ip_address = ipaddress.IPAddress # type: ignore
|
||
|
ipaddress.ip_network = ipaddress.IPNetwork # type: ignore
|
||
|
|
||
|
|
||
|
__all__ = [
|
||
|
"ipaddress", "uses_pycache", "console_to_str",
|
||
|
"get_path_uid", "stdlib_pkgs", "WINDOWS", "samefile", "get_terminal_size",
|
||
|
]
|
||
|
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
if PY2:
|
||
|
import imp
|
||
|
|
||
|
try:
|
||
|
cache_from_source = imp.cache_from_source # type: ignore
|
||
|
except AttributeError:
|
||
|
# does not use __pycache__
|
||
|
cache_from_source = None
|
||
|
|
||
|
uses_pycache = cache_from_source is not None
|
||
|
else:
|
||
|
uses_pycache = True
|
||
|
from importlib.util import cache_from_source
|
||
|
|
||
|
|
||
|
if PY2:
|
||
|
# In Python 2.7, backslashreplace exists
|
||
|
# but does not support use for decoding.
|
||
|
# We implement our own replace handler for this
|
||
|
# situation, so that we can consistently use
|
||
|
# backslash replacement for all versions.
|
||
|
def backslashreplace_decode_fn(err):
|
||
|
raw_bytes = (err.object[i] for i in range(err.start, err.end))
|
||
|
# Python 2 gave us characters - convert to numeric bytes
|
||
|
raw_bytes = (ord(b) for b in raw_bytes)
|
||
|
return u"".join(map(u"\\x{:x}".format, raw_bytes)), err.end
|
||
|
codecs.register_error(
|
||
|
"backslashreplace_decode",
|
||
|
backslashreplace_decode_fn,
|
||
|
)
|
||
|
backslashreplace_decode = "backslashreplace_decode"
|
||
|
else:
|
||
|
backslashreplace_decode = "backslashreplace"
|
||
|
|
||
|
|
||
|
def has_tls():
|
||
|
# type: () -> bool
|
||
|
try:
|
||
|
import _ssl # noqa: F401 # ignore unused
|
||
|
return True
|
||
|
except ImportError:
|
||
|
pass
|
||
|
|
||
|
from pip._vendor.urllib3.util import IS_PYOPENSSL
|
||
|
return IS_PYOPENSSL
|
||
|
|
||
|
|
||
|
def str_to_display(data, desc=None):
|
||
|
# type: (Union[bytes, Text], Optional[str]) -> Text
|
||
|
"""
|
||
|
For display or logging purposes, convert a bytes object (or text) to
|
||
|
text (e.g. unicode in Python 2) safe for output.
|
||
|
|
||
|
:param desc: An optional phrase describing the input data, for use in
|
||
|
the log message if a warning is logged. Defaults to "Bytes object".
|
||
|
|
||
|
This function should never error out and so can take a best effort
|
||
|
approach. It is okay to be lossy if needed since the return value is
|
||
|
just for display.
|
||
|
|
||
|
We assume the data is in the locale preferred encoding. If it won't
|
||
|
decode properly, we warn the user but decode as best we can.
|
||
|
|
||
|
We also ensure that the output can be safely written to standard output
|
||
|
without encoding errors.
|
||
|
"""
|
||
|
if isinstance(data, text_type):
|
||
|
return data
|
||
|
|
||
|
# Otherwise, data is a bytes object (str in Python 2).
|
||
|
# First, get the encoding we assume. This is the preferred
|
||
|
# encoding for the locale, unless that is not found, or
|
||
|
# it is ASCII, in which case assume UTF-8
|
||
|
encoding = locale.getpreferredencoding()
|
||
|
if (not encoding) or codecs.lookup(encoding).name == "ascii":
|
||
|
encoding = "utf-8"
|
||
|
|
||
|
# Now try to decode the data - if we fail, warn the user and
|
||
|
# decode with replacement.
|
||
|
try:
|
||
|
decoded_data = data.decode(encoding)
|
||
|
except UnicodeDecodeError:
|
||
|
logger.warning(
|
||
|
'%s does not appear to be encoded as %s',
|
||
|
desc or 'Bytes object',
|
||
|
encoding,
|
||
|
)
|
||
|
decoded_data = data.decode(encoding, errors=backslashreplace_decode)
|
||
|
|
||
|
# Make sure we can print the output, by encoding it to the output
|
||
|
# encoding with replacement of unencodable characters, and then
|
||
|
# decoding again.
|
||
|
# We use stderr's encoding because it's less likely to be
|
||
|
# redirected and if we don't find an encoding we skip this
|
||
|
# step (on the assumption that output is wrapped by something
|
||
|
# that won't fail).
|
||
|
# The double getattr is to deal with the possibility that we're
|
||
|
# being called in a situation where sys.__stderr__ doesn't exist,
|
||
|
# or doesn't have an encoding attribute. Neither of these cases
|
||
|
# should occur in normal pip use, but there's no harm in checking
|
||
|
# in case people use pip in (unsupported) unusual situations.
|
||
|
output_encoding = getattr(getattr(sys, "__stderr__", None),
|
||
|
"encoding", None)
|
||
|
|
||
|
if output_encoding:
|
||
|
output_encoded = decoded_data.encode(
|
||
|
output_encoding,
|
||
|
errors="backslashreplace"
|
||
|
)
|
||
|
decoded_data = output_encoded.decode(output_encoding)
|
||
|
|
||
|
return decoded_data
|
||
|
|
||
|
|
||
|
def console_to_str(data):
|
||
|
# type: (bytes) -> Text
|
||
|
"""Return a string, safe for output, of subprocess output.
|
||
|
"""
|
||
|
return str_to_display(data, desc='Subprocess output')
|
||
|
|
||
|
|
||
|
def get_path_uid(path):
|
||
|
# type: (str) -> int
|
||
|
"""
|
||
|
Return path's uid.
|
||
|
|
||
|
Does not follow symlinks:
|
||
|
https://github.com/pypa/pip/pull/935#discussion_r5307003
|
||
|
|
||
|
Placed this function in compat due to differences on AIX and
|
||
|
Jython, that should eventually go away.
|
||
|
|
||
|
:raises OSError: When path is a symlink or can't be read.
|
||
|
"""
|
||
|
if hasattr(os, 'O_NOFOLLOW'):
|
||
|
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
|
||
|
file_uid = os.fstat(fd).st_uid
|
||
|
os.close(fd)
|
||
|
else: # AIX and Jython
|
||
|
# WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
|
||
|
if not os.path.islink(path):
|
||
|
# older versions of Jython don't have `os.fstat`
|
||
|
file_uid = os.stat(path).st_uid
|
||
|
else:
|
||
|
# raise OSError for parity with os.O_NOFOLLOW above
|
||
|
raise OSError(
|
||
|
"{} is a symlink; Will not return uid for symlinks".format(
|
||
|
path)
|
||
|
)
|
||
|
return file_uid
|
||
|
|
||
|
|
||
|
def expanduser(path):
|
||
|
# type: (str) -> str
|
||
|
"""
|
||
|
Expand ~ and ~user constructions.
|
||
|
|
||
|
Includes a workaround for https://bugs.python.org/issue14768
|
||
|
"""
|
||
|
expanded = os.path.expanduser(path)
|
||
|
if path.startswith('~/') and expanded.startswith('//'):
|
||
|
expanded = expanded[1:]
|
||
|
return expanded
|
||
|
|
||
|
|
||
|
# packages in the stdlib that may have installation metadata, but should not be
|
||
|
# considered 'installed'. this theoretically could be determined based on
|
||
|
# dist.location (py27:`sysconfig.get_paths()['stdlib']`,
|
||
|
# py26:sysconfig.get_config_vars('LIBDEST')), but fear platform variation may
|
||
|
# make this ineffective, so hard-coding
|
||
|
stdlib_pkgs = {"python", "wsgiref", "argparse"}
|
||
|
|
||
|
|
||
|
# windows detection, covers cpython and ironpython
|
||
|
WINDOWS = (sys.platform.startswith("win") or
|
||
|
(sys.platform == 'cli' and os.name == 'nt'))
|
||
|
|
||
|
|
||
|
def samefile(file1, file2):
|
||
|
# type: (str, str) -> bool
|
||
|
"""Provide an alternative for os.path.samefile on Windows/Python2"""
|
||
|
if hasattr(os.path, 'samefile'):
|
||
|
return os.path.samefile(file1, file2)
|
||
|
else:
|
||
|
path1 = os.path.normcase(os.path.abspath(file1))
|
||
|
path2 = os.path.normcase(os.path.abspath(file2))
|
||
|
return path1 == path2
|
||
|
|
||
|
|
||
|
if hasattr(shutil, 'get_terminal_size'):
|
||
|
def get_terminal_size():
|
||
|
# type: () -> Tuple[int, int]
|
||
|
"""
|
||
|
Returns a tuple (x, y) representing the width(x) and the height(y)
|
||
|
in characters of the terminal window.
|
||
|
"""
|
||
|
return tuple(shutil.get_terminal_size()) # type: ignore
|
||
|
else:
|
||
|
def get_terminal_size():
|
||
|
# type: () -> Tuple[int, int]
|
||
|
"""
|
||
|
Returns a tuple (x, y) representing the width(x) and the height(y)
|
||
|
in characters of the terminal window.
|
||
|
"""
|
||
|
def ioctl_GWINSZ(fd):
|
||
|
try:
|
||
|
import fcntl
|
||
|
import struct
|
||
|
import termios
|
||
|
cr = struct.unpack_from(
|
||
|
'hh',
|
||
|
fcntl.ioctl(fd, termios.TIOCGWINSZ, '12345678')
|
||
|
)
|
||
|
except Exception:
|
||
|
return None
|
||
|
if cr == (0, 0):
|
||
|
return None
|
||
|
return cr
|
||
|
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
|
||
|
if not cr:
|
||
|
if sys.platform != "win32":
|
||
|
try:
|
||
|
fd = os.open(os.ctermid(), os.O_RDONLY)
|
||
|
cr = ioctl_GWINSZ(fd)
|
||
|
os.close(fd)
|
||
|
except Exception:
|
||
|
pass
|
||
|
if not cr:
|
||
|
cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
|
||
|
return int(cr[1]), int(cr[0])
|
||
|
|
||
|
|
||
|
# Fallback to noop_lru_cache in Python 2
|
||
|
# TODO: this can be removed when python 2 support is dropped!
|
||
|
def noop_lru_cache(maxsize=None):
|
||
|
# type: (Optional[int]) -> Callable[[F], F]
|
||
|
def _wrapper(f):
|
||
|
# type: (F) -> F
|
||
|
return f
|
||
|
return _wrapper
|
||
|
|
||
|
|
||
|
lru_cache = getattr(functools, "lru_cache", noop_lru_cache) # type: LruCache
|