123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- import configparser
- import logging
- import os
- from typing import List, Optional, Tuple
- from pip._internal.exceptions import BadCommand, InstallationError
- from pip._internal.utils.misc import HiddenText, display_path
- from pip._internal.utils.subprocess import make_command
- from pip._internal.utils.urls import path_to_url
- from pip._internal.vcs.versioncontrol import (
- RevOptions,
- VersionControl,
- find_path_to_project_root_from_repo_root,
- vcs,
- )
- logger = logging.getLogger(__name__)
- class Mercurial(VersionControl):
- name = "hg"
- dirname = ".hg"
- repo_name = "clone"
- schemes = (
- "hg+file",
- "hg+http",
- "hg+https",
- "hg+ssh",
- "hg+static-http",
- )
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- return [rev]
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- rev_display = rev_options.to_display()
- logger.info(
- "Cloning hg %s%s to %s",
- url,
- rev_display,
- display_path(dest),
- )
- if verbosity <= 0:
- flags: Tuple[str, ...] = ("--quiet",)
- elif verbosity == 1:
- flags = ()
- elif verbosity == 2:
- flags = ("--verbose",)
- else:
- flags = ("--verbose", "--debug")
- self.run_command(make_command("clone", "--noupdate", *flags, url, dest))
- self.run_command(
- make_command("update", *flags, rev_options.to_args()),
- cwd=dest,
- )
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- repo_config = os.path.join(dest, self.dirname, "hgrc")
- config = configparser.RawConfigParser()
- try:
- config.read(repo_config)
- config.set("paths", "default", url.secret)
- with open(repo_config, "w") as config_file:
- config.write(config_file)
- except (OSError, configparser.NoSectionError) as exc:
- logger.warning("Could not switch Mercurial repository to %s: %s", url, exc)
- else:
- cmd_args = make_command("update", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- self.run_command(["pull", "-q"], cwd=dest)
- cmd_args = make_command("update", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- url = cls.run_command(
- ["showconfig", "paths.default"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- if cls._is_local_repository(url):
- url = path_to_url(url)
- return url.strip()
- @classmethod
- def get_revision(cls, location: str) -> str:
- """
- Return the repository-local changeset revision number, as an integer.
- """
- current_revision = cls.run_command(
- ["parents", "--template={rev}"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- return current_revision
- @classmethod
- def get_requirement_revision(cls, location: str) -> str:
- """
- Return the changeset identification hash, as a 40-character
- hexadecimal string
- """
- current_rev_hash = cls.run_command(
- ["parents", "--template={node}"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- return current_rev_hash
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """Always assume the versions don't match"""
- return False
- @classmethod
- def get_subdirectory(cls, location: str) -> Optional[str]:
- """
- Return the path to Python project root, relative to the repo root.
- Return None if the project root is in the repo root.
- """
- # find the repo root
- repo_root = cls.run_command(
- ["root"], show_stdout=False, stdout_only=True, cwd=location
- ).strip()
- if not os.path.isabs(repo_root):
- repo_root = os.path.abspath(os.path.join(location, repo_root))
- return find_path_to_project_root_from_repo_root(location, repo_root)
- @classmethod
- def get_repository_root(cls, location: str) -> Optional[str]:
- loc = super().get_repository_root(location)
- if loc:
- return loc
- try:
- r = cls.run_command(
- ["root"],
- cwd=location,
- show_stdout=False,
- stdout_only=True,
- on_returncode="raise",
- log_failed_cmd=False,
- )
- except BadCommand:
- logger.debug(
- "could not determine if %s is under hg control "
- "because hg is not available",
- location,
- )
- return None
- except InstallationError:
- return None
- return os.path.normpath(r.rstrip("\r\n"))
- vcs.register(Mercurial)
|