123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- import logging
- import os
- import re
- from typing import List, Optional, Tuple
- from pip._internal.utils.misc import (
- HiddenText,
- display_path,
- is_console_interactive,
- is_installable_dir,
- split_auth_from_netloc,
- )
- from pip._internal.utils.subprocess import CommandArgs, make_command
- from pip._internal.vcs.versioncontrol import (
- AuthInfo,
- RemoteNotFoundError,
- RevOptions,
- VersionControl,
- vcs,
- )
- logger = logging.getLogger(__name__)
- _svn_xml_url_re = re.compile('url="([^"]+)"')
- _svn_rev_re = re.compile(r'committed-rev="(\d+)"')
- _svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"')
- _svn_info_xml_url_re = re.compile(r"<url>(.*)</url>")
- class Subversion(VersionControl):
- name = "svn"
- dirname = ".svn"
- repo_name = "checkout"
- schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file")
- @classmethod
- def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
- return True
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- return ["-r", rev]
- @classmethod
- def get_revision(cls, location: str) -> str:
- """
- Return the maximum revision for all files under a given location
- """
- # Note: taken from setuptools.command.egg_info
- revision = 0
- for base, dirs, _ in os.walk(location):
- if cls.dirname not in dirs:
- dirs[:] = []
- continue # no sense walking uncontrolled subdirs
- dirs.remove(cls.dirname)
- entries_fn = os.path.join(base, cls.dirname, "entries")
- if not os.path.exists(entries_fn):
- # FIXME: should we warn?
- continue
- dirurl, localrev = cls._get_svn_url_rev(base)
- if base == location:
- assert dirurl is not None
- base = dirurl + "/" # save the root url
- elif not dirurl or not dirurl.startswith(base):
- dirs[:] = []
- continue # not part of the same svn tree, skip it
- revision = max(revision, localrev)
- return str(revision)
- @classmethod
- def get_netloc_and_auth(
- cls, netloc: str, scheme: str
- ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
- """
- This override allows the auth information to be passed to svn via the
- --username and --password options instead of via the URL.
- """
- if scheme == "ssh":
- # The --username and --password options can't be used for
- # svn+ssh URLs, so keep the auth information in the URL.
- return super().get_netloc_and_auth(netloc, scheme)
- return split_auth_from_netloc(netloc)
- @classmethod
- def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
- # hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it
- url, rev, user_pass = super().get_url_rev_and_auth(url)
- if url.startswith("ssh://"):
- url = "svn+" + url
- return url, rev, user_pass
- @staticmethod
- def make_rev_args(
- username: Optional[str], password: Optional[HiddenText]
- ) -> CommandArgs:
- extra_args: CommandArgs = []
- if username:
- extra_args += ["--username", username]
- if password:
- extra_args += ["--password", password]
- return extra_args
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- # In cases where the source is in a subdirectory, we have to look up in
- # the location until we find a valid project root.
- orig_location = location
- while not is_installable_dir(location):
- last_location = location
- location = os.path.dirname(location)
- if location == last_location:
- # We've traversed up to the root of the filesystem without
- # finding a Python project.
- logger.warning(
- "Could not find Python project for directory %s (tried all "
- "parent directories)",
- orig_location,
- )
- raise RemoteNotFoundError
- url, _rev = cls._get_svn_url_rev(location)
- if url is None:
- raise RemoteNotFoundError
- return url
- @classmethod
- def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]:
- from pip._internal.exceptions import InstallationError
- entries_path = os.path.join(location, cls.dirname, "entries")
- if os.path.exists(entries_path):
- with open(entries_path) as f:
- data = f.read()
- else: # subversion >= 1.7 does not have the 'entries' file
- data = ""
- url = None
- if data.startswith("8") or data.startswith("9") or data.startswith("10"):
- entries = list(map(str.splitlines, data.split("\n\x0c\n")))
- del entries[0][0] # get rid of the '8'
- url = entries[0][3]
- revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0]
- elif data.startswith("<?xml"):
- match = _svn_xml_url_re.search(data)
- if not match:
- raise ValueError(f"Badly formatted data: {data!r}")
- url = match.group(1) # get repository URL
- revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0]
- else:
- try:
- # subversion >= 1.7
- # Note that using get_remote_call_options is not necessary here
- # because `svn info` is being run against a local directory.
- # We don't need to worry about making sure interactive mode
- # is being used to prompt for passwords, because passwords
- # are only potentially needed for remote server requests.
- xml = cls.run_command(
- ["info", "--xml", location],
- show_stdout=False,
- stdout_only=True,
- )
- match = _svn_info_xml_url_re.search(xml)
- assert match is not None
- url = match.group(1)
- revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)]
- except InstallationError:
- url, revs = None, []
- if revs:
- rev = max(revs)
- else:
- rev = 0
- return url, rev
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """Always assume the versions don't match"""
- return False
- def __init__(self, use_interactive: Optional[bool] = None) -> None:
- if use_interactive is None:
- use_interactive = is_console_interactive()
- self.use_interactive = use_interactive
- # This member is used to cache the fetched version of the current
- # ``svn`` client.
- # Special value definitions:
- # None: Not evaluated yet.
- # Empty tuple: Could not parse version.
- self._vcs_version: Optional[Tuple[int, ...]] = None
- super().__init__()
- def call_vcs_version(self) -> Tuple[int, ...]:
- """Query the version of the currently installed Subversion client.
- :return: A tuple containing the parts of the version information or
- ``()`` if the version returned from ``svn`` could not be parsed.
- :raises: BadCommand: If ``svn`` is not installed.
- """
- # Example versions:
- # svn, version 1.10.3 (r1842928)
- # compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0
- # svn, version 1.7.14 (r1542130)
- # compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu
- # svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0)
- # compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2
- version_prefix = "svn, version "
- version = self.run_command(["--version"], show_stdout=False, stdout_only=True)
- if not version.startswith(version_prefix):
- return ()
- version = version[len(version_prefix) :].split()[0]
- version_list = version.partition("-")[0].split(".")
- try:
- parsed_version = tuple(map(int, version_list))
- except ValueError:
- return ()
- return parsed_version
- def get_vcs_version(self) -> Tuple[int, ...]:
- """Return the version of the currently installed Subversion client.
- If the version of the Subversion client has already been queried,
- a cached value will be used.
- :return: A tuple containing the parts of the version information or
- ``()`` if the version returned from ``svn`` could not be parsed.
- :raises: BadCommand: If ``svn`` is not installed.
- """
- if self._vcs_version is not None:
- # Use cached version, if available.
- # If parsing the version failed previously (empty tuple),
- # do not attempt to parse it again.
- return self._vcs_version
- vcs_version = self.call_vcs_version()
- self._vcs_version = vcs_version
- return vcs_version
- def get_remote_call_options(self) -> CommandArgs:
- """Return options to be used on calls to Subversion that contact the server.
- These options are applicable for the following ``svn`` subcommands used
- in this class.
- - checkout
- - switch
- - update
- :return: A list of command line arguments to pass to ``svn``.
- """
- if not self.use_interactive:
- # --non-interactive switch is available since Subversion 0.14.4.
- # Subversion < 1.8 runs in interactive mode by default.
- return ["--non-interactive"]
- svn_version = self.get_vcs_version()
- # By default, Subversion >= 1.8 runs in non-interactive mode if
- # stdin is not a TTY. Since that is how pip invokes SVN, in
- # call_subprocess(), pip must pass --force-interactive to ensure
- # the user can be prompted for a password, if required.
- # SVN added the --force-interactive option in SVN 1.8. Since
- # e.g. RHEL/CentOS 7, which is supported until 2024, ships with
- # SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip
- # can't safely add the option if the SVN version is < 1.8 (or unknown).
- if svn_version >= (1, 8):
- return ["--force-interactive"]
- return []
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- rev_display = rev_options.to_display()
- logger.info(
- "Checking out %s%s to %s",
- url,
- rev_display,
- display_path(dest),
- )
- if verbosity <= 0:
- flag = "--quiet"
- else:
- flag = ""
- cmd_args = make_command(
- "checkout",
- flag,
- self.get_remote_call_options(),
- rev_options.to_args(),
- url,
- dest,
- )
- self.run_command(cmd_args)
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- cmd_args = make_command(
- "switch",
- self.get_remote_call_options(),
- rev_options.to_args(),
- url,
- dest,
- )
- self.run_command(cmd_args)
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- cmd_args = make_command(
- "update",
- self.get_remote_call_options(),
- rev_options.to_args(),
- dest,
- )
- self.run_command(cmd_args)
- vcs.register(Subversion)
|