279 lines
9.7 KiB
Python
279 lines
9.7 KiB
Python
from __future__ import absolute_import
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
|
|
from pip._vendor.six.moves import shlex_quote
|
|
|
|
from pip._internal.cli.spinners import SpinnerInterface, open_spinner
|
|
from pip._internal.exceptions import InstallationError
|
|
from pip._internal.utils.compat import console_to_str, str_to_display
|
|
from pip._internal.utils.logging import subprocess_logger
|
|
from pip._internal.utils.misc import HiddenText, path_to_display
|
|
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
|
|
|
|
if MYPY_CHECK_RUNNING:
|
|
from typing import Any, Callable, Iterable, List, Mapping, Optional, Text, Union
|
|
|
|
CommandArgs = List[Union[str, HiddenText]]
|
|
|
|
|
|
LOG_DIVIDER = '----------------------------------------'
|
|
|
|
|
|
def make_command(*args):
|
|
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
|
|
"""
|
|
Create a CommandArgs object.
|
|
"""
|
|
command_args = [] # type: CommandArgs
|
|
for arg in args:
|
|
# Check for list instead of CommandArgs since CommandArgs is
|
|
# only known during type-checking.
|
|
if isinstance(arg, list):
|
|
command_args.extend(arg)
|
|
else:
|
|
# Otherwise, arg is str or HiddenText.
|
|
command_args.append(arg)
|
|
|
|
return command_args
|
|
|
|
|
|
def format_command_args(args):
|
|
# type: (Union[List[str], CommandArgs]) -> str
|
|
"""
|
|
Format command arguments for display.
|
|
"""
|
|
# For HiddenText arguments, display the redacted form by calling str().
|
|
# Also, we don't apply str() to arguments that aren't HiddenText since
|
|
# this can trigger a UnicodeDecodeError in Python 2 if the argument
|
|
# has type unicode and includes a non-ascii character. (The type
|
|
# checker doesn't ensure the annotations are correct in all cases.)
|
|
return ' '.join(
|
|
shlex_quote(str(arg)) if isinstance(arg, HiddenText)
|
|
else shlex_quote(arg) for arg in args
|
|
)
|
|
|
|
|
|
def reveal_command_args(args):
|
|
# type: (Union[List[str], CommandArgs]) -> List[str]
|
|
"""
|
|
Return the arguments in their raw, unredacted form.
|
|
"""
|
|
return [
|
|
arg.secret if isinstance(arg, HiddenText) else arg for arg in args
|
|
]
|
|
|
|
|
|
def make_subprocess_output_error(
|
|
cmd_args, # type: Union[List[str], CommandArgs]
|
|
cwd, # type: Optional[str]
|
|
lines, # type: List[Text]
|
|
exit_status, # type: int
|
|
):
|
|
# type: (...) -> Text
|
|
"""
|
|
Create and return the error message to use to log a subprocess error
|
|
with command output.
|
|
|
|
:param lines: A list of lines, each ending with a newline.
|
|
"""
|
|
command = format_command_args(cmd_args)
|
|
# Convert `command` and `cwd` to text (unicode in Python 2) so we can use
|
|
# them as arguments in the unicode format string below. This avoids
|
|
# "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2
|
|
# if either contains a non-ascii character.
|
|
command_display = str_to_display(command, desc='command bytes')
|
|
cwd_display = path_to_display(cwd)
|
|
|
|
# We know the joined output value ends in a newline.
|
|
output = ''.join(lines)
|
|
msg = (
|
|
# Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
|
|
# codec can't encode character ..." in Python 2 when a format
|
|
# argument (e.g. `output`) has a non-ascii character.
|
|
u'Command errored out with exit status {exit_status}:\n'
|
|
' command: {command_display}\n'
|
|
' cwd: {cwd_display}\n'
|
|
'Complete output ({line_count} lines):\n{output}{divider}'
|
|
).format(
|
|
exit_status=exit_status,
|
|
command_display=command_display,
|
|
cwd_display=cwd_display,
|
|
line_count=len(lines),
|
|
output=output,
|
|
divider=LOG_DIVIDER,
|
|
)
|
|
return msg
|
|
|
|
|
|
def call_subprocess(
|
|
cmd, # type: Union[List[str], CommandArgs]
|
|
show_stdout=False, # type: bool
|
|
cwd=None, # type: Optional[str]
|
|
on_returncode='raise', # type: str
|
|
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
|
|
command_desc=None, # type: Optional[str]
|
|
extra_environ=None, # type: Optional[Mapping[str, Any]]
|
|
unset_environ=None, # type: Optional[Iterable[str]]
|
|
spinner=None, # type: Optional[SpinnerInterface]
|
|
log_failed_cmd=True # type: Optional[bool]
|
|
):
|
|
# type: (...) -> Text
|
|
"""
|
|
Args:
|
|
show_stdout: if true, use INFO to log the subprocess's stderr and
|
|
stdout streams. Otherwise, use DEBUG. Defaults to False.
|
|
extra_ok_returncodes: an iterable of integer return codes that are
|
|
acceptable, in addition to 0. Defaults to None, which means [].
|
|
unset_environ: an iterable of environment variable names to unset
|
|
prior to calling subprocess.Popen().
|
|
log_failed_cmd: if false, failed commands are not logged, only raised.
|
|
"""
|
|
if extra_ok_returncodes is None:
|
|
extra_ok_returncodes = []
|
|
if unset_environ is None:
|
|
unset_environ = []
|
|
# Most places in pip use show_stdout=False. What this means is--
|
|
#
|
|
# - We connect the child's output (combined stderr and stdout) to a
|
|
# single pipe, which we read.
|
|
# - We log this output to stderr at DEBUG level as it is received.
|
|
# - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
|
|
# requested), then we show a spinner so the user can still see the
|
|
# subprocess is in progress.
|
|
# - If the subprocess exits with an error, we log the output to stderr
|
|
# at ERROR level if it hasn't already been displayed to the console
|
|
# (e.g. if --verbose logging wasn't enabled). This way we don't log
|
|
# the output to the console twice.
|
|
#
|
|
# If show_stdout=True, then the above is still done, but with DEBUG
|
|
# replaced by INFO.
|
|
if show_stdout:
|
|
# Then log the subprocess output at INFO level.
|
|
log_subprocess = subprocess_logger.info
|
|
used_level = logging.INFO
|
|
else:
|
|
# Then log the subprocess output using DEBUG. This also ensures
|
|
# it will be logged to the log file (aka user_log), if enabled.
|
|
log_subprocess = subprocess_logger.debug
|
|
used_level = logging.DEBUG
|
|
|
|
# Whether the subprocess will be visible in the console.
|
|
showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
|
|
|
|
# Only use the spinner if we're not showing the subprocess output
|
|
# and we have a spinner.
|
|
use_spinner = not showing_subprocess and spinner is not None
|
|
|
|
if command_desc is None:
|
|
command_desc = format_command_args(cmd)
|
|
|
|
log_subprocess("Running command %s", command_desc)
|
|
env = os.environ.copy()
|
|
if extra_environ:
|
|
env.update(extra_environ)
|
|
for name in unset_environ:
|
|
env.pop(name, None)
|
|
try:
|
|
proc = subprocess.Popen(
|
|
# Convert HiddenText objects to the underlying str.
|
|
reveal_command_args(cmd),
|
|
stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, cwd=cwd, env=env,
|
|
)
|
|
assert proc.stdin
|
|
assert proc.stdout
|
|
proc.stdin.close()
|
|
except Exception as exc:
|
|
if log_failed_cmd:
|
|
subprocess_logger.critical(
|
|
"Error %s while executing command %s", exc, command_desc,
|
|
)
|
|
raise
|
|
all_output = []
|
|
while True:
|
|
# The "line" value is a unicode string in Python 2.
|
|
line = console_to_str(proc.stdout.readline())
|
|
if not line:
|
|
break
|
|
line = line.rstrip()
|
|
all_output.append(line + '\n')
|
|
|
|
# Show the line immediately.
|
|
log_subprocess(line)
|
|
# Update the spinner.
|
|
if use_spinner:
|
|
assert spinner
|
|
spinner.spin()
|
|
try:
|
|
proc.wait()
|
|
finally:
|
|
if proc.stdout:
|
|
proc.stdout.close()
|
|
proc_had_error = (
|
|
proc.returncode and proc.returncode not in extra_ok_returncodes
|
|
)
|
|
if use_spinner:
|
|
assert spinner
|
|
if proc_had_error:
|
|
spinner.finish("error")
|
|
else:
|
|
spinner.finish("done")
|
|
if proc_had_error:
|
|
if on_returncode == 'raise':
|
|
if not showing_subprocess and log_failed_cmd:
|
|
# Then the subprocess streams haven't been logged to the
|
|
# console yet.
|
|
msg = make_subprocess_output_error(
|
|
cmd_args=cmd,
|
|
cwd=cwd,
|
|
lines=all_output,
|
|
exit_status=proc.returncode,
|
|
)
|
|
subprocess_logger.error(msg)
|
|
exc_msg = (
|
|
'Command errored out with exit status {}: {} '
|
|
'Check the logs for full command output.'
|
|
).format(proc.returncode, command_desc)
|
|
raise InstallationError(exc_msg)
|
|
elif on_returncode == 'warn':
|
|
subprocess_logger.warning(
|
|
'Command "%s" had error code %s in %s',
|
|
command_desc,
|
|
proc.returncode,
|
|
cwd,
|
|
)
|
|
elif on_returncode == 'ignore':
|
|
pass
|
|
else:
|
|
raise ValueError('Invalid value: on_returncode={!r}'.format(
|
|
on_returncode))
|
|
return ''.join(all_output)
|
|
|
|
|
|
def runner_with_spinner_message(message):
|
|
# type: (str) -> Callable[..., None]
|
|
"""Provide a subprocess_runner that shows a spinner message.
|
|
|
|
Intended for use with for pep517's Pep517HookCaller. Thus, the runner has
|
|
an API that matches what's expected by Pep517HookCaller.subprocess_runner.
|
|
"""
|
|
|
|
def runner(
|
|
cmd, # type: List[str]
|
|
cwd=None, # type: Optional[str]
|
|
extra_environ=None # type: Optional[Mapping[str, Any]]
|
|
):
|
|
# type: (...) -> None
|
|
with open_spinner(message) as spinner:
|
|
call_subprocess(
|
|
cmd,
|
|
cwd=cwd,
|
|
extra_environ=extra_environ,
|
|
spinner=spinner,
|
|
)
|
|
|
|
return runner
|