direct_url.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. """ PEP 610 """
  2. import json
  3. import re
  4. import urllib.parse
  5. from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
  6. __all__ = [
  7. "DirectUrl",
  8. "DirectUrlValidationError",
  9. "DirInfo",
  10. "ArchiveInfo",
  11. "VcsInfo",
  12. ]
  13. T = TypeVar("T")
  14. DIRECT_URL_METADATA_NAME = "direct_url.json"
  15. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  16. class DirectUrlValidationError(Exception):
  17. pass
  18. def _get(
  19. d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
  20. ) -> Optional[T]:
  21. """Get value from dictionary and verify expected type."""
  22. if key not in d:
  23. return default
  24. value = d[key]
  25. if not isinstance(value, expected_type):
  26. raise DirectUrlValidationError(
  27. "{!r} has unexpected type for {} (expected {})".format(
  28. value, key, expected_type
  29. )
  30. )
  31. return value
  32. def _get_required(
  33. d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
  34. ) -> T:
  35. value = _get(d, expected_type, key, default)
  36. if value is None:
  37. raise DirectUrlValidationError(f"{key} must have a value")
  38. return value
  39. def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
  40. infos = [info for info in infos if info is not None]
  41. if not infos:
  42. raise DirectUrlValidationError(
  43. "missing one of archive_info, dir_info, vcs_info"
  44. )
  45. if len(infos) > 1:
  46. raise DirectUrlValidationError(
  47. "more than one of archive_info, dir_info, vcs_info"
  48. )
  49. assert infos[0] is not None
  50. return infos[0]
  51. def _filter_none(**kwargs: Any) -> Dict[str, Any]:
  52. """Make dict excluding None values."""
  53. return {k: v for k, v in kwargs.items() if v is not None}
  54. class VcsInfo:
  55. name = "vcs_info"
  56. def __init__(
  57. self,
  58. vcs: str,
  59. commit_id: str,
  60. requested_revision: Optional[str] = None,
  61. ) -> None:
  62. self.vcs = vcs
  63. self.requested_revision = requested_revision
  64. self.commit_id = commit_id
  65. @classmethod
  66. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
  67. if d is None:
  68. return None
  69. return cls(
  70. vcs=_get_required(d, str, "vcs"),
  71. commit_id=_get_required(d, str, "commit_id"),
  72. requested_revision=_get(d, str, "requested_revision"),
  73. )
  74. def _to_dict(self) -> Dict[str, Any]:
  75. return _filter_none(
  76. vcs=self.vcs,
  77. requested_revision=self.requested_revision,
  78. commit_id=self.commit_id,
  79. )
  80. class ArchiveInfo:
  81. name = "archive_info"
  82. def __init__(
  83. self,
  84. hash: Optional[str] = None,
  85. hashes: Optional[Dict[str, str]] = None,
  86. ) -> None:
  87. # set hashes before hash, since the hash setter will further populate hashes
  88. self.hashes = hashes
  89. self.hash = hash
  90. @property
  91. def hash(self) -> Optional[str]:
  92. return self._hash
  93. @hash.setter
  94. def hash(self, value: Optional[str]) -> None:
  95. if value is not None:
  96. # Auto-populate the hashes key to upgrade to the new format automatically.
  97. # We don't back-populate the legacy hash key from hashes.
  98. try:
  99. hash_name, hash_value = value.split("=", 1)
  100. except ValueError:
  101. raise DirectUrlValidationError(
  102. f"invalid archive_info.hash format: {value!r}"
  103. )
  104. if self.hashes is None:
  105. self.hashes = {hash_name: hash_value}
  106. elif hash_name not in self.hashes:
  107. self.hashes = self.hashes.copy()
  108. self.hashes[hash_name] = hash_value
  109. self._hash = value
  110. @classmethod
  111. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
  112. if d is None:
  113. return None
  114. return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
  115. def _to_dict(self) -> Dict[str, Any]:
  116. return _filter_none(hash=self.hash, hashes=self.hashes)
  117. class DirInfo:
  118. name = "dir_info"
  119. def __init__(
  120. self,
  121. editable: bool = False,
  122. ) -> None:
  123. self.editable = editable
  124. @classmethod
  125. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
  126. if d is None:
  127. return None
  128. return cls(editable=_get_required(d, bool, "editable", default=False))
  129. def _to_dict(self) -> Dict[str, Any]:
  130. return _filter_none(editable=self.editable or None)
  131. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  132. class DirectUrl:
  133. def __init__(
  134. self,
  135. url: str,
  136. info: InfoType,
  137. subdirectory: Optional[str] = None,
  138. ) -> None:
  139. self.url = url
  140. self.info = info
  141. self.subdirectory = subdirectory
  142. def _remove_auth_from_netloc(self, netloc: str) -> str:
  143. if "@" not in netloc:
  144. return netloc
  145. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  146. if (
  147. isinstance(self.info, VcsInfo)
  148. and self.info.vcs == "git"
  149. and user_pass == "git"
  150. ):
  151. return netloc
  152. if ENV_VAR_RE.match(user_pass):
  153. return netloc
  154. return netloc_no_user_pass
  155. @property
  156. def redacted_url(self) -> str:
  157. """url with user:password part removed unless it is formed with
  158. environment variables as specified in PEP 610, or it is ``git``
  159. in the case of a git URL.
  160. """
  161. purl = urllib.parse.urlsplit(self.url)
  162. netloc = self._remove_auth_from_netloc(purl.netloc)
  163. surl = urllib.parse.urlunsplit(
  164. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  165. )
  166. return surl
  167. def validate(self) -> None:
  168. self.from_dict(self.to_dict())
  169. @classmethod
  170. def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
  171. return DirectUrl(
  172. url=_get_required(d, str, "url"),
  173. subdirectory=_get(d, str, "subdirectory"),
  174. info=_exactly_one_of(
  175. [
  176. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  177. DirInfo._from_dict(_get(d, dict, "dir_info")),
  178. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  179. ]
  180. ),
  181. )
  182. def to_dict(self) -> Dict[str, Any]:
  183. res = _filter_none(
  184. url=self.redacted_url,
  185. subdirectory=self.subdirectory,
  186. )
  187. res[self.info.name] = self.info._to_dict()
  188. return res
  189. @classmethod
  190. def from_json(cls, s: str) -> "DirectUrl":
  191. return cls.from_dict(json.loads(s))
  192. def to_json(self) -> str:
  193. return json.dumps(self.to_dict(), sort_keys=True)
  194. def is_local_editable(self) -> bool:
  195. return isinstance(self.info, DirInfo) and self.info.editable