_format.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. from __future__ import annotations
  2. from contextlib import suppress
  3. from datetime import date, datetime
  4. from uuid import UUID
  5. import ipaddress
  6. import re
  7. import typing
  8. import warnings
  9. from jsonschema.exceptions import FormatError
  10. _FormatCheckCallable = typing.Callable[[object], bool]
  11. #: A format checker callable.
  12. _F = typing.TypeVar("_F", bound=_FormatCheckCallable)
  13. _RaisesType = typing.Union[
  14. typing.Type[Exception], typing.Tuple[typing.Type[Exception], ...],
  15. ]
  16. _RE_DATE = re.compile(r"^\d{4}-\d{2}-\d{2}$", re.ASCII)
  17. class FormatChecker:
  18. """
  19. A ``format`` property checker.
  20. JSON Schema does not mandate that the ``format`` property actually do any
  21. validation. If validation is desired however, instances of this class can
  22. be hooked into validators to enable format validation.
  23. `FormatChecker` objects always return ``True`` when asked about
  24. formats that they do not know how to validate.
  25. To add a check for a custom format use the `FormatChecker.checks`
  26. decorator.
  27. Arguments:
  28. formats:
  29. The known formats to validate. This argument can be used to
  30. limit which formats will be used during validation.
  31. """
  32. checkers: dict[
  33. str,
  34. tuple[_FormatCheckCallable, _RaisesType],
  35. ] = {} # noqa: RUF012
  36. def __init__(self, formats: typing.Iterable[str] | None = None):
  37. if formats is None:
  38. formats = self.checkers.keys()
  39. self.checkers = {k: self.checkers[k] for k in formats}
  40. def __repr__(self):
  41. return f"<FormatChecker checkers={sorted(self.checkers)}>"
  42. def checks(
  43. self, format: str, raises: _RaisesType = (),
  44. ) -> typing.Callable[[_F], _F]:
  45. """
  46. Register a decorated function as validating a new format.
  47. Arguments:
  48. format:
  49. The format that the decorated function will check.
  50. raises:
  51. The exception(s) raised by the decorated function when an
  52. invalid instance is found.
  53. The exception object will be accessible as the
  54. `jsonschema.exceptions.ValidationError.cause` attribute of the
  55. resulting validation error.
  56. """
  57. def _checks(func: _F) -> _F:
  58. self.checkers[format] = (func, raises)
  59. return func
  60. return _checks
  61. @classmethod
  62. def cls_checks(
  63. cls, format: str, raises: _RaisesType = (),
  64. ) -> typing.Callable[[_F], _F]:
  65. warnings.warn(
  66. (
  67. "FormatChecker.cls_checks is deprecated. Call "
  68. "FormatChecker.checks on a specific FormatChecker instance "
  69. "instead."
  70. ),
  71. DeprecationWarning,
  72. stacklevel=2,
  73. )
  74. return cls._cls_checks(format=format, raises=raises)
  75. @classmethod
  76. def _cls_checks(
  77. cls, format: str, raises: _RaisesType = (),
  78. ) -> typing.Callable[[_F], _F]:
  79. def _checks(func: _F) -> _F:
  80. cls.checkers[format] = (func, raises)
  81. return func
  82. return _checks
  83. def check(self, instance: object, format: str) -> None:
  84. """
  85. Check whether the instance conforms to the given format.
  86. Arguments:
  87. instance (*any primitive type*, i.e. str, number, bool):
  88. The instance to check
  89. format:
  90. The format that instance should conform to
  91. Raises:
  92. FormatError:
  93. if the instance does not conform to ``format``
  94. """
  95. if format not in self.checkers:
  96. return
  97. func, raises = self.checkers[format]
  98. result, cause = None, None
  99. try:
  100. result = func(instance)
  101. except raises as e:
  102. cause = e
  103. if not result:
  104. raise FormatError(f"{instance!r} is not a {format!r}", cause=cause)
  105. def conforms(self, instance: object, format: str) -> bool:
  106. """
  107. Check whether the instance conforms to the given format.
  108. Arguments:
  109. instance (*any primitive type*, i.e. str, number, bool):
  110. The instance to check
  111. format:
  112. The format that instance should conform to
  113. Returns:
  114. bool: whether it conformed
  115. """
  116. try:
  117. self.check(instance, format)
  118. except FormatError:
  119. return False
  120. else:
  121. return True
  122. draft3_format_checker = FormatChecker()
  123. draft4_format_checker = FormatChecker()
  124. draft6_format_checker = FormatChecker()
  125. draft7_format_checker = FormatChecker()
  126. draft201909_format_checker = FormatChecker()
  127. draft202012_format_checker = FormatChecker()
  128. _draft_checkers: dict[str, FormatChecker] = dict(
  129. draft3=draft3_format_checker,
  130. draft4=draft4_format_checker,
  131. draft6=draft6_format_checker,
  132. draft7=draft7_format_checker,
  133. draft201909=draft201909_format_checker,
  134. draft202012=draft202012_format_checker,
  135. )
  136. def _checks_drafts(
  137. name=None,
  138. draft3=None,
  139. draft4=None,
  140. draft6=None,
  141. draft7=None,
  142. draft201909=None,
  143. draft202012=None,
  144. raises=(),
  145. ) -> typing.Callable[[_F], _F]:
  146. draft3 = draft3 or name
  147. draft4 = draft4 or name
  148. draft6 = draft6 or name
  149. draft7 = draft7 or name
  150. draft201909 = draft201909 or name
  151. draft202012 = draft202012 or name
  152. def wrap(func: _F) -> _F:
  153. if draft3:
  154. func = _draft_checkers["draft3"].checks(draft3, raises)(func)
  155. if draft4:
  156. func = _draft_checkers["draft4"].checks(draft4, raises)(func)
  157. if draft6:
  158. func = _draft_checkers["draft6"].checks(draft6, raises)(func)
  159. if draft7:
  160. func = _draft_checkers["draft7"].checks(draft7, raises)(func)
  161. if draft201909:
  162. func = _draft_checkers["draft201909"].checks(draft201909, raises)(
  163. func,
  164. )
  165. if draft202012:
  166. func = _draft_checkers["draft202012"].checks(draft202012, raises)(
  167. func,
  168. )
  169. # Oy. This is bad global state, but relied upon for now, until
  170. # deprecation. See #519 and test_format_checkers_come_with_defaults
  171. FormatChecker._cls_checks(
  172. draft202012 or draft201909 or draft7 or draft6 or draft4 or draft3,
  173. raises,
  174. )(func)
  175. return func
  176. return wrap
  177. @_checks_drafts(name="idn-email")
  178. @_checks_drafts(name="email")
  179. def is_email(instance: object) -> bool:
  180. if not isinstance(instance, str):
  181. return True
  182. return "@" in instance
  183. @_checks_drafts(
  184. draft3="ip-address",
  185. draft4="ipv4",
  186. draft6="ipv4",
  187. draft7="ipv4",
  188. draft201909="ipv4",
  189. draft202012="ipv4",
  190. raises=ipaddress.AddressValueError,
  191. )
  192. def is_ipv4(instance: object) -> bool:
  193. if not isinstance(instance, str):
  194. return True
  195. return bool(ipaddress.IPv4Address(instance))
  196. @_checks_drafts(name="ipv6", raises=ipaddress.AddressValueError)
  197. def is_ipv6(instance: object) -> bool:
  198. if not isinstance(instance, str):
  199. return True
  200. address = ipaddress.IPv6Address(instance)
  201. return not getattr(address, "scope_id", "")
  202. with suppress(ImportError):
  203. from fqdn import FQDN
  204. @_checks_drafts(
  205. draft3="host-name",
  206. draft4="hostname",
  207. draft6="hostname",
  208. draft7="hostname",
  209. draft201909="hostname",
  210. draft202012="hostname",
  211. )
  212. def is_host_name(instance: object) -> bool:
  213. if not isinstance(instance, str):
  214. return True
  215. return FQDN(instance, min_labels=1).is_valid
  216. with suppress(ImportError):
  217. # The built-in `idna` codec only implements RFC 3890, so we go elsewhere.
  218. import idna
  219. @_checks_drafts(
  220. draft7="idn-hostname",
  221. draft201909="idn-hostname",
  222. draft202012="idn-hostname",
  223. raises=(idna.IDNAError, UnicodeError),
  224. )
  225. def is_idn_host_name(instance: object) -> bool:
  226. if not isinstance(instance, str):
  227. return True
  228. idna.encode(instance)
  229. return True
  230. try:
  231. import rfc3987
  232. except ImportError:
  233. with suppress(ImportError):
  234. from rfc3986_validator import validate_rfc3986
  235. @_checks_drafts(name="uri")
  236. def is_uri(instance: object) -> bool:
  237. if not isinstance(instance, str):
  238. return True
  239. return validate_rfc3986(instance, rule="URI")
  240. @_checks_drafts(
  241. draft6="uri-reference",
  242. draft7="uri-reference",
  243. draft201909="uri-reference",
  244. draft202012="uri-reference",
  245. raises=ValueError,
  246. )
  247. def is_uri_reference(instance: object) -> bool:
  248. if not isinstance(instance, str):
  249. return True
  250. return validate_rfc3986(instance, rule="URI_reference")
  251. else:
  252. @_checks_drafts(
  253. draft7="iri",
  254. draft201909="iri",
  255. draft202012="iri",
  256. raises=ValueError,
  257. )
  258. def is_iri(instance: object) -> bool:
  259. if not isinstance(instance, str):
  260. return True
  261. return rfc3987.parse(instance, rule="IRI")
  262. @_checks_drafts(
  263. draft7="iri-reference",
  264. draft201909="iri-reference",
  265. draft202012="iri-reference",
  266. raises=ValueError,
  267. )
  268. def is_iri_reference(instance: object) -> bool:
  269. if not isinstance(instance, str):
  270. return True
  271. return rfc3987.parse(instance, rule="IRI_reference")
  272. @_checks_drafts(name="uri", raises=ValueError)
  273. def is_uri(instance: object) -> bool:
  274. if not isinstance(instance, str):
  275. return True
  276. return rfc3987.parse(instance, rule="URI")
  277. @_checks_drafts(
  278. draft6="uri-reference",
  279. draft7="uri-reference",
  280. draft201909="uri-reference",
  281. draft202012="uri-reference",
  282. raises=ValueError,
  283. )
  284. def is_uri_reference(instance: object) -> bool:
  285. if not isinstance(instance, str):
  286. return True
  287. return rfc3987.parse(instance, rule="URI_reference")
  288. with suppress(ImportError):
  289. from rfc3339_validator import validate_rfc3339
  290. @_checks_drafts(name="date-time")
  291. def is_datetime(instance: object) -> bool:
  292. if not isinstance(instance, str):
  293. return True
  294. return validate_rfc3339(instance.upper())
  295. @_checks_drafts(
  296. draft7="time",
  297. draft201909="time",
  298. draft202012="time",
  299. )
  300. def is_time(instance: object) -> bool:
  301. if not isinstance(instance, str):
  302. return True
  303. return is_datetime("1970-01-01T" + instance)
  304. @_checks_drafts(name="regex", raises=re.error)
  305. def is_regex(instance: object) -> bool:
  306. if not isinstance(instance, str):
  307. return True
  308. return bool(re.compile(instance))
  309. @_checks_drafts(
  310. draft3="date",
  311. draft7="date",
  312. draft201909="date",
  313. draft202012="date",
  314. raises=ValueError,
  315. )
  316. def is_date(instance: object) -> bool:
  317. if not isinstance(instance, str):
  318. return True
  319. return bool(_RE_DATE.fullmatch(instance) and date.fromisoformat(instance))
  320. @_checks_drafts(draft3="time", raises=ValueError)
  321. def is_draft3_time(instance: object) -> bool:
  322. if not isinstance(instance, str):
  323. return True
  324. return bool(datetime.strptime(instance, "%H:%M:%S")) # noqa: DTZ007
  325. with suppress(ImportError):
  326. from webcolors import CSS21_NAMES_TO_HEX
  327. import webcolors
  328. def is_css_color_code(instance: object) -> bool:
  329. return webcolors.normalize_hex(instance)
  330. @_checks_drafts(draft3="color", raises=(ValueError, TypeError))
  331. def is_css21_color(instance: object) -> bool:
  332. if (
  333. not isinstance(instance, str)
  334. or instance.lower() in CSS21_NAMES_TO_HEX
  335. ):
  336. return True
  337. return is_css_color_code(instance)
  338. with suppress(ImportError):
  339. import jsonpointer
  340. @_checks_drafts(
  341. draft6="json-pointer",
  342. draft7="json-pointer",
  343. draft201909="json-pointer",
  344. draft202012="json-pointer",
  345. raises=jsonpointer.JsonPointerException,
  346. )
  347. def is_json_pointer(instance: object) -> bool:
  348. if not isinstance(instance, str):
  349. return True
  350. return bool(jsonpointer.JsonPointer(instance))
  351. # TODO: I don't want to maintain this, so it
  352. # needs to go either into jsonpointer (pending
  353. # https://github.com/stefankoegl/python-json-pointer/issues/34) or
  354. # into a new external library.
  355. @_checks_drafts(
  356. draft7="relative-json-pointer",
  357. draft201909="relative-json-pointer",
  358. draft202012="relative-json-pointer",
  359. raises=jsonpointer.JsonPointerException,
  360. )
  361. def is_relative_json_pointer(instance: object) -> bool:
  362. # Definition taken from:
  363. # https://tools.ietf.org/html/draft-handrews-relative-json-pointer-01#section-3
  364. if not isinstance(instance, str):
  365. return True
  366. if not instance:
  367. return False
  368. non_negative_integer, rest = [], ""
  369. for i, character in enumerate(instance):
  370. if character.isdigit():
  371. # digits with a leading "0" are not allowed
  372. if i > 0 and int(instance[i - 1]) == 0:
  373. return False
  374. non_negative_integer.append(character)
  375. continue
  376. if not non_negative_integer:
  377. return False
  378. rest = instance[i:]
  379. break
  380. return (rest == "#") or bool(jsonpointer.JsonPointer(rest))
  381. with suppress(ImportError):
  382. import uri_template
  383. @_checks_drafts(
  384. draft6="uri-template",
  385. draft7="uri-template",
  386. draft201909="uri-template",
  387. draft202012="uri-template",
  388. )
  389. def is_uri_template(instance: object) -> bool:
  390. if not isinstance(instance, str):
  391. return True
  392. return uri_template.validate(instance)
  393. with suppress(ImportError):
  394. import isoduration
  395. @_checks_drafts(
  396. draft201909="duration",
  397. draft202012="duration",
  398. raises=isoduration.DurationParsingException,
  399. )
  400. def is_duration(instance: object) -> bool:
  401. if not isinstance(instance, str):
  402. return True
  403. isoduration.parse_duration(instance)
  404. # FIXME: See bolsote/isoduration#25 and bolsote/isoduration#21
  405. return instance.endswith(tuple("DMYWHMS"))
  406. @_checks_drafts(
  407. draft201909="uuid",
  408. draft202012="uuid",
  409. raises=ValueError,
  410. )
  411. def is_uuid(instance: object) -> bool:
  412. if not isinstance(instance, str):
  413. return True
  414. UUID(instance)
  415. return all(instance[position] == "-" for position in (8, 13, 18, 23))