123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- import contextlib
- import io
- import os
- import shlex
- import shutil
- import sys
- import tempfile
- import typing as t
- from types import TracebackType
- from . import formatting
- from . import termui
- from . import utils
- from ._compat import _find_binary_reader
- if t.TYPE_CHECKING:
- from .core import BaseCommand
- class EchoingStdin:
- def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
- self._input = input
- self._output = output
- self._paused = False
- def __getattr__(self, x: str) -> t.Any:
- return getattr(self._input, x)
- def _echo(self, rv: bytes) -> bytes:
- if not self._paused:
- self._output.write(rv)
- return rv
- def read(self, n: int = -1) -> bytes:
- return self._echo(self._input.read(n))
- def read1(self, n: int = -1) -> bytes:
- return self._echo(self._input.read1(n)) # type: ignore
- def readline(self, n: int = -1) -> bytes:
- return self._echo(self._input.readline(n))
- def readlines(self) -> t.List[bytes]:
- return [self._echo(x) for x in self._input.readlines()]
- def __iter__(self) -> t.Iterator[bytes]:
- return iter(self._echo(x) for x in self._input)
- def __repr__(self) -> str:
- return repr(self._input)
- @contextlib.contextmanager
- def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]:
- if stream is None:
- yield
- else:
- stream._paused = True
- yield
- stream._paused = False
- class _NamedTextIOWrapper(io.TextIOWrapper):
- def __init__(
- self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
- ) -> None:
- super().__init__(buffer, **kwargs)
- self._name = name
- self._mode = mode
- @property
- def name(self) -> str:
- return self._name
- @property
- def mode(self) -> str:
- return self._mode
- def make_input_stream(
- input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str
- ) -> t.BinaryIO:
- # Is already an input stream.
- if hasattr(input, "read"):
- rv = _find_binary_reader(t.cast(t.IO[t.Any], input))
- if rv is not None:
- return rv
- raise TypeError("Could not find binary reader for input stream.")
- if input is None:
- input = b""
- elif isinstance(input, str):
- input = input.encode(charset)
- return io.BytesIO(input)
- class Result:
- """Holds the captured result of an invoked CLI script."""
- def __init__(
- self,
- runner: "CliRunner",
- stdout_bytes: bytes,
- stderr_bytes: t.Optional[bytes],
- return_value: t.Any,
- exit_code: int,
- exception: t.Optional[BaseException],
- exc_info: t.Optional[
- t.Tuple[t.Type[BaseException], BaseException, TracebackType]
- ] = None,
- ):
- #: The runner that created the result
- self.runner = runner
- #: The standard output as bytes.
- self.stdout_bytes = stdout_bytes
- #: The standard error as bytes, or None if not available
- self.stderr_bytes = stderr_bytes
- #: The value returned from the invoked command.
- #:
- #: .. versionadded:: 8.0
- self.return_value = return_value
- #: The exit code as integer.
- self.exit_code = exit_code
- #: The exception that happened if one did.
- self.exception = exception
- #: The traceback
- self.exc_info = exc_info
- @property
- def output(self) -> str:
- """The (standard) output as unicode string."""
- return self.stdout
- @property
- def stdout(self) -> str:
- """The standard output as unicode string."""
- return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
- "\r\n", "\n"
- )
- @property
- def stderr(self) -> str:
- """The standard error as unicode string."""
- if self.stderr_bytes is None:
- raise ValueError("stderr not separately captured")
- return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
- "\r\n", "\n"
- )
- def __repr__(self) -> str:
- exc_str = repr(self.exception) if self.exception else "okay"
- return f"<{type(self).__name__} {exc_str}>"
- class CliRunner:
- """The CLI runner provides functionality to invoke a Click command line
- script for unittesting purposes in a isolated environment. This only
- works in single-threaded systems without any concurrency as it changes the
- global interpreter state.
- :param charset: the character set for the input and output data.
- :param env: a dictionary with environment variables for overriding.
- :param echo_stdin: if this is set to `True`, then reading from stdin writes
- to stdout. This is useful for showing examples in
- some circumstances. Note that regular prompts
- will automatically echo the input.
- :param mix_stderr: if this is set to `False`, then stdout and stderr are
- preserved as independent streams. This is useful for
- Unix-philosophy apps that have predictable stdout and
- noisy stderr, such that each may be measured
- independently
- """
- def __init__(
- self,
- charset: str = "utf-8",
- env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
- echo_stdin: bool = False,
- mix_stderr: bool = True,
- ) -> None:
- self.charset = charset
- self.env: t.Mapping[str, t.Optional[str]] = env or {}
- self.echo_stdin = echo_stdin
- self.mix_stderr = mix_stderr
- def get_default_prog_name(self, cli: "BaseCommand") -> str:
- """Given a command object it will return the default program name
- for it. The default is the `name` attribute or ``"root"`` if not
- set.
- """
- return cli.name or "root"
- def make_env(
- self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None
- ) -> t.Mapping[str, t.Optional[str]]:
- """Returns the environment overrides for invoking a script."""
- rv = dict(self.env)
- if overrides:
- rv.update(overrides)
- return rv
- @contextlib.contextmanager
- def isolation(
- self,
- input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
- env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
- color: bool = False,
- ) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:
- """A context manager that sets up the isolation for invoking of a
- command line tool. This sets up stdin with the given input data
- and `os.environ` with the overrides from the given dictionary.
- This also rebinds some internals in Click to be mocked (like the
- prompt functionality).
- This is automatically done in the :meth:`invoke` method.
- :param input: the input stream to put into sys.stdin.
- :param env: the environment overrides as dictionary.
- :param color: whether the output should contain color codes. The
- application can still override this explicitly.
- .. versionchanged:: 8.0
- ``stderr`` is opened with ``errors="backslashreplace"``
- instead of the default ``"strict"``.
- .. versionchanged:: 4.0
- Added the ``color`` parameter.
- """
- bytes_input = make_input_stream(input, self.charset)
- echo_input = None
- old_stdin = sys.stdin
- old_stdout = sys.stdout
- old_stderr = sys.stderr
- old_forced_width = formatting.FORCED_WIDTH
- formatting.FORCED_WIDTH = 80
- env = self.make_env(env)
- bytes_output = io.BytesIO()
- if self.echo_stdin:
- bytes_input = echo_input = t.cast(
- t.BinaryIO, EchoingStdin(bytes_input, bytes_output)
- )
- sys.stdin = text_input = _NamedTextIOWrapper(
- bytes_input, encoding=self.charset, name="<stdin>", mode="r"
- )
- if self.echo_stdin:
- # Force unbuffered reads, otherwise TextIOWrapper reads a
- # large chunk which is echoed early.
- text_input._CHUNK_SIZE = 1 # type: ignore
- sys.stdout = _NamedTextIOWrapper(
- bytes_output, encoding=self.charset, name="<stdout>", mode="w"
- )
- bytes_error = None
- if self.mix_stderr:
- sys.stderr = sys.stdout
- else:
- bytes_error = io.BytesIO()
- sys.stderr = _NamedTextIOWrapper(
- bytes_error,
- encoding=self.charset,
- name="<stderr>",
- mode="w",
- errors="backslashreplace",
- )
- @_pause_echo(echo_input) # type: ignore
- def visible_input(prompt: t.Optional[str] = None) -> str:
- sys.stdout.write(prompt or "")
- val = text_input.readline().rstrip("\r\n")
- sys.stdout.write(f"{val}\n")
- sys.stdout.flush()
- return val
- @_pause_echo(echo_input) # type: ignore
- def hidden_input(prompt: t.Optional[str] = None) -> str:
- sys.stdout.write(f"{prompt or ''}\n")
- sys.stdout.flush()
- return text_input.readline().rstrip("\r\n")
- @_pause_echo(echo_input) # type: ignore
- def _getchar(echo: bool) -> str:
- char = sys.stdin.read(1)
- if echo:
- sys.stdout.write(char)
- sys.stdout.flush()
- return char
- default_color = color
- def should_strip_ansi(
- stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
- ) -> bool:
- if color is None:
- return not default_color
- return not color
- old_visible_prompt_func = termui.visible_prompt_func
- old_hidden_prompt_func = termui.hidden_prompt_func
- old__getchar_func = termui._getchar
- old_should_strip_ansi = utils.should_strip_ansi # type: ignore
- termui.visible_prompt_func = visible_input
- termui.hidden_prompt_func = hidden_input
- termui._getchar = _getchar
- utils.should_strip_ansi = should_strip_ansi # type: ignore
- old_env = {}
- try:
- for key, value in env.items():
- old_env[key] = os.environ.get(key)
- if value is None:
- try:
- del os.environ[key]
- except Exception:
- pass
- else:
- os.environ[key] = value
- yield (bytes_output, bytes_error)
- finally:
- for key, value in old_env.items():
- if value is None:
- try:
- del os.environ[key]
- except Exception:
- pass
- else:
- os.environ[key] = value
- sys.stdout = old_stdout
- sys.stderr = old_stderr
- sys.stdin = old_stdin
- termui.visible_prompt_func = old_visible_prompt_func
- termui.hidden_prompt_func = old_hidden_prompt_func
- termui._getchar = old__getchar_func
- utils.should_strip_ansi = old_should_strip_ansi # type: ignore
- formatting.FORCED_WIDTH = old_forced_width
- def invoke(
- self,
- cli: "BaseCommand",
- args: t.Optional[t.Union[str, t.Sequence[str]]] = None,
- input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
- env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
- catch_exceptions: bool = True,
- color: bool = False,
- **extra: t.Any,
- ) -> Result:
- """Invokes a command in an isolated environment. The arguments are
- forwarded directly to the command line script, the `extra` keyword
- arguments are passed to the :meth:`~clickpkg.Command.main` function of
- the command.
- This returns a :class:`Result` object.
- :param cli: the command to invoke
- :param args: the arguments to invoke. It may be given as an iterable
- or a string. When given as string it will be interpreted
- as a Unix shell command. More details at
- :func:`shlex.split`.
- :param input: the input data for `sys.stdin`.
- :param env: the environment overrides.
- :param catch_exceptions: Whether to catch any other exceptions than
- ``SystemExit``.
- :param extra: the keyword arguments to pass to :meth:`main`.
- :param color: whether the output should contain color codes. The
- application can still override this explicitly.
- .. versionchanged:: 8.0
- The result object has the ``return_value`` attribute with
- the value returned from the invoked command.
- .. versionchanged:: 4.0
- Added the ``color`` parameter.
- .. versionchanged:: 3.0
- Added the ``catch_exceptions`` parameter.
- .. versionchanged:: 3.0
- The result object has the ``exc_info`` attribute with the
- traceback if available.
- """
- exc_info = None
- with self.isolation(input=input, env=env, color=color) as outstreams:
- return_value = None
- exception: t.Optional[BaseException] = None
- exit_code = 0
- if isinstance(args, str):
- args = shlex.split(args)
- try:
- prog_name = extra.pop("prog_name")
- except KeyError:
- prog_name = self.get_default_prog_name(cli)
- try:
- return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
- except SystemExit as e:
- exc_info = sys.exc_info()
- e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code)
- if e_code is None:
- e_code = 0
- if e_code != 0:
- exception = e
- if not isinstance(e_code, int):
- sys.stdout.write(str(e_code))
- sys.stdout.write("\n")
- e_code = 1
- exit_code = e_code
- except Exception as e:
- if not catch_exceptions:
- raise
- exception = e
- exit_code = 1
- exc_info = sys.exc_info()
- finally:
- sys.stdout.flush()
- stdout = outstreams[0].getvalue()
- if self.mix_stderr:
- stderr = None
- else:
- stderr = outstreams[1].getvalue() # type: ignore
- return Result(
- runner=self,
- stdout_bytes=stdout,
- stderr_bytes=stderr,
- return_value=return_value,
- exit_code=exit_code,
- exception=exception,
- exc_info=exc_info, # type: ignore
- )
- @contextlib.contextmanager
- def isolated_filesystem(
- self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None
- ) -> t.Iterator[str]:
- """A context manager that creates a temporary directory and
- changes the current working directory to it. This isolates tests
- that affect the contents of the CWD to prevent them from
- interfering with each other.
- :param temp_dir: Create the temporary directory under this
- directory. If given, the created directory is not removed
- when exiting.
- .. versionchanged:: 8.0
- Added the ``temp_dir`` parameter.
- """
- cwd = os.getcwd()
- dt = tempfile.mkdtemp(dir=temp_dir)
- os.chdir(dt)
- try:
- yield dt
- finally:
- os.chdir(cwd)
- if temp_dir is None:
- try:
- shutil.rmtree(dt)
- except OSError: # noqa: B014
- pass
|