jsonschema.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. """
  2. Referencing implementations for JSON Schema specs (historic & current).
  3. """
  4. from __future__ import annotations
  5. from collections.abc import Sequence, Set
  6. from typing import Any, Iterable, Union
  7. from referencing import Anchor, Registry, Resource, Specification, exceptions
  8. from referencing._attrs import frozen
  9. from referencing._core import (
  10. _UNSET, # type: ignore[reportPrivateUsage]
  11. Resolved as _Resolved,
  12. Resolver as _Resolver,
  13. _Unset, # type: ignore[reportPrivateUsage]
  14. )
  15. from referencing.typing import URI, Anchor as AnchorType, Mapping
  16. #: A JSON Schema which is a JSON object
  17. ObjectSchema = Mapping[str, Any]
  18. #: A JSON Schema of any kind
  19. Schema = Union[bool, ObjectSchema]
  20. #: A Resource whose contents are JSON Schemas
  21. SchemaResource = Resource[Schema]
  22. #: A JSON Schema Registry
  23. SchemaRegistry = Registry[Schema]
  24. #: The empty JSON Schema Registry
  25. EMPTY_REGISTRY: SchemaRegistry = Registry()
  26. @frozen
  27. class UnknownDialect(Exception):
  28. """
  29. A dialect identifier was found for a dialect unknown by this library.
  30. If it's a custom ("unofficial") dialect, be sure you've registered it.
  31. """
  32. uri: URI
  33. def _dollar_id(contents: Schema) -> URI | None:
  34. if isinstance(contents, bool):
  35. return
  36. return contents.get("$id")
  37. def _legacy_dollar_id(contents: Schema) -> URI | None:
  38. if isinstance(contents, bool) or "$ref" in contents:
  39. return
  40. id = contents.get("$id")
  41. if id is not None and not id.startswith("#"):
  42. return id
  43. def _legacy_id(contents: ObjectSchema) -> URI | None:
  44. if "$ref" in contents:
  45. return
  46. id = contents.get("id")
  47. if id is not None and not id.startswith("#"):
  48. return id
  49. def _anchor(
  50. specification: Specification[Schema],
  51. contents: Schema,
  52. ) -> Iterable[AnchorType[Schema]]:
  53. if isinstance(contents, bool):
  54. return
  55. anchor = contents.get("$anchor")
  56. if anchor is not None:
  57. yield Anchor(
  58. name=anchor,
  59. resource=specification.create_resource(contents),
  60. )
  61. dynamic_anchor = contents.get("$dynamicAnchor")
  62. if dynamic_anchor is not None:
  63. yield DynamicAnchor(
  64. name=dynamic_anchor,
  65. resource=specification.create_resource(contents),
  66. )
  67. def _anchor_2019(
  68. specification: Specification[Schema],
  69. contents: Schema,
  70. ) -> Iterable[Anchor[Schema]]:
  71. if isinstance(contents, bool):
  72. return []
  73. anchor = contents.get("$anchor")
  74. if anchor is None:
  75. return []
  76. return [
  77. Anchor(
  78. name=anchor,
  79. resource=specification.create_resource(contents),
  80. ),
  81. ]
  82. def _legacy_anchor_in_dollar_id(
  83. specification: Specification[Schema],
  84. contents: Schema,
  85. ) -> Iterable[Anchor[Schema]]:
  86. if isinstance(contents, bool):
  87. return []
  88. id = contents.get("$id", "")
  89. if not id.startswith("#"):
  90. return []
  91. return [
  92. Anchor(
  93. name=id[1:],
  94. resource=specification.create_resource(contents),
  95. ),
  96. ]
  97. def _legacy_anchor_in_id(
  98. specification: Specification[ObjectSchema],
  99. contents: ObjectSchema,
  100. ) -> Iterable[Anchor[ObjectSchema]]:
  101. id = contents.get("id", "")
  102. if not id.startswith("#"):
  103. return []
  104. return [
  105. Anchor(
  106. name=id[1:],
  107. resource=specification.create_resource(contents),
  108. ),
  109. ]
  110. def _subresources_of(
  111. in_value: Set[str] = frozenset(),
  112. in_subvalues: Set[str] = frozenset(),
  113. in_subarray: Set[str] = frozenset(),
  114. ):
  115. """
  116. Create a callable returning JSON Schema specification-style subschemas.
  117. Relies on specifying the set of keywords containing subschemas in their
  118. values, in a subobject's values, or in a subarray.
  119. """
  120. def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
  121. if isinstance(contents, bool):
  122. return
  123. for each in in_value:
  124. if each in contents:
  125. yield contents[each]
  126. for each in in_subarray:
  127. if each in contents:
  128. yield from contents[each]
  129. for each in in_subvalues:
  130. if each in contents:
  131. yield from contents[each].values()
  132. return subresources_of
  133. def _subresources_of_with_crazy_items(
  134. in_value: Set[str] = frozenset(),
  135. in_subvalues: Set[str] = frozenset(),
  136. in_subarray: Set[str] = frozenset(),
  137. ):
  138. """
  139. Specifically handle older drafts where there are some funky keywords.
  140. """
  141. def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
  142. if isinstance(contents, bool):
  143. return
  144. for each in in_value:
  145. if each in contents:
  146. yield contents[each]
  147. for each in in_subarray:
  148. if each in contents:
  149. yield from contents[each]
  150. for each in in_subvalues:
  151. if each in contents:
  152. yield from contents[each].values()
  153. items = contents.get("items")
  154. if items is not None:
  155. if isinstance(items, Sequence):
  156. yield from items
  157. else:
  158. yield items
  159. return subresources_of
  160. def _subresources_of_with_crazy_items_dependencies(
  161. in_value: Set[str] = frozenset(),
  162. in_subvalues: Set[str] = frozenset(),
  163. in_subarray: Set[str] = frozenset(),
  164. ):
  165. """
  166. Specifically handle older drafts where there are some funky keywords.
  167. """
  168. def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
  169. if isinstance(contents, bool):
  170. return
  171. for each in in_value:
  172. if each in contents:
  173. yield contents[each]
  174. for each in in_subarray:
  175. if each in contents:
  176. yield from contents[each]
  177. for each in in_subvalues:
  178. if each in contents:
  179. yield from contents[each].values()
  180. items = contents.get("items")
  181. if items is not None:
  182. if isinstance(items, Sequence):
  183. yield from items
  184. else:
  185. yield items
  186. dependencies = contents.get("dependencies")
  187. if dependencies is not None:
  188. values = iter(dependencies.values())
  189. value = next(values, None)
  190. if isinstance(value, Mapping):
  191. yield value
  192. yield from values
  193. return subresources_of
  194. def _subresources_of_with_crazy_aP_items_dependencies(
  195. in_value: Set[str] = frozenset(),
  196. in_subvalues: Set[str] = frozenset(),
  197. in_subarray: Set[str] = frozenset(),
  198. ):
  199. """
  200. Specifically handle even older drafts where there are some funky keywords.
  201. """
  202. def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
  203. for each in in_value:
  204. if each in contents:
  205. yield contents[each]
  206. for each in in_subarray:
  207. if each in contents:
  208. yield from contents[each]
  209. for each in in_subvalues:
  210. if each in contents:
  211. yield from contents[each].values()
  212. items = contents.get("items")
  213. if items is not None:
  214. if isinstance(items, Sequence):
  215. yield from items
  216. else:
  217. yield items
  218. dependencies = contents.get("dependencies")
  219. if dependencies is not None:
  220. values = iter(dependencies.values())
  221. value = next(values, None)
  222. if isinstance(value, Mapping):
  223. yield value
  224. yield from values
  225. for each in "additionalItems", "additionalProperties":
  226. value = contents.get(each)
  227. if isinstance(value, Mapping):
  228. yield value
  229. return subresources_of
  230. def _maybe_in_subresource(
  231. in_value: Set[str] = frozenset(),
  232. in_subvalues: Set[str] = frozenset(),
  233. in_subarray: Set[str] = frozenset(),
  234. ):
  235. in_child = in_subvalues | in_subarray
  236. def maybe_in_subresource(
  237. segments: Sequence[int | str],
  238. resolver: _Resolver[Any],
  239. subresource: Resource[Any],
  240. ) -> _Resolver[Any]:
  241. _segments = iter(segments)
  242. for segment in _segments:
  243. if segment not in in_value and (
  244. segment not in in_child or next(_segments, None) is None
  245. ):
  246. return resolver
  247. return resolver.in_subresource(subresource)
  248. return maybe_in_subresource
  249. def _maybe_in_subresource_crazy_items(
  250. in_value: Set[str] = frozenset(),
  251. in_subvalues: Set[str] = frozenset(),
  252. in_subarray: Set[str] = frozenset(),
  253. ):
  254. in_child = in_subvalues | in_subarray
  255. def maybe_in_subresource(
  256. segments: Sequence[int | str],
  257. resolver: _Resolver[Any],
  258. subresource: Resource[Any],
  259. ) -> _Resolver[Any]:
  260. _segments = iter(segments)
  261. for segment in _segments:
  262. if segment == "items" and isinstance(
  263. subresource.contents,
  264. Mapping,
  265. ):
  266. return resolver.in_subresource(subresource)
  267. if segment not in in_value and (
  268. segment not in in_child or next(_segments, None) is None
  269. ):
  270. return resolver
  271. return resolver.in_subresource(subresource)
  272. return maybe_in_subresource
  273. def _maybe_in_subresource_crazy_items_dependencies(
  274. in_value: Set[str] = frozenset(),
  275. in_subvalues: Set[str] = frozenset(),
  276. in_subarray: Set[str] = frozenset(),
  277. ):
  278. in_child = in_subvalues | in_subarray
  279. def maybe_in_subresource(
  280. segments: Sequence[int | str],
  281. resolver: _Resolver[Any],
  282. subresource: Resource[Any],
  283. ) -> _Resolver[Any]:
  284. _segments = iter(segments)
  285. for segment in _segments:
  286. if segment in {"items", "dependencies"} and isinstance(
  287. subresource.contents,
  288. Mapping,
  289. ):
  290. return resolver.in_subresource(subresource)
  291. if segment not in in_value and (
  292. segment not in in_child or next(_segments, None) is None
  293. ):
  294. return resolver
  295. return resolver.in_subresource(subresource)
  296. return maybe_in_subresource
  297. #: JSON Schema draft 2020-12
  298. DRAFT202012 = Specification(
  299. name="draft2020-12",
  300. id_of=_dollar_id,
  301. subresources_of=_subresources_of(
  302. in_value={
  303. "additionalProperties",
  304. "contains",
  305. "contentSchema",
  306. "else",
  307. "if",
  308. "items",
  309. "not",
  310. "propertyNames",
  311. "then",
  312. "unevaluatedItems",
  313. "unevaluatedProperties",
  314. },
  315. in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
  316. in_subvalues={
  317. "$defs",
  318. "definitions",
  319. "dependentSchemas",
  320. "patternProperties",
  321. "properties",
  322. },
  323. ),
  324. anchors_in=_anchor,
  325. maybe_in_subresource=_maybe_in_subresource(
  326. in_value={
  327. "additionalProperties",
  328. "contains",
  329. "contentSchema",
  330. "else",
  331. "if",
  332. "items",
  333. "not",
  334. "propertyNames",
  335. "then",
  336. "unevaluatedItems",
  337. "unevaluatedProperties",
  338. },
  339. in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
  340. in_subvalues={
  341. "$defs",
  342. "definitions",
  343. "dependentSchemas",
  344. "patternProperties",
  345. "properties",
  346. },
  347. ),
  348. )
  349. #: JSON Schema draft 2019-09
  350. DRAFT201909 = Specification(
  351. name="draft2019-09",
  352. id_of=_dollar_id,
  353. subresources_of=_subresources_of_with_crazy_items(
  354. in_value={
  355. "additionalItems",
  356. "additionalProperties",
  357. "contains",
  358. "contentSchema",
  359. "else",
  360. "if",
  361. "not",
  362. "propertyNames",
  363. "then",
  364. "unevaluatedItems",
  365. "unevaluatedProperties",
  366. },
  367. in_subarray={"allOf", "anyOf", "oneOf"},
  368. in_subvalues={
  369. "$defs",
  370. "definitions",
  371. "dependentSchemas",
  372. "patternProperties",
  373. "properties",
  374. },
  375. ),
  376. anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real
  377. maybe_in_subresource=_maybe_in_subresource_crazy_items(
  378. in_value={
  379. "additionalItems",
  380. "additionalProperties",
  381. "contains",
  382. "contentSchema",
  383. "else",
  384. "if",
  385. "not",
  386. "propertyNames",
  387. "then",
  388. "unevaluatedItems",
  389. "unevaluatedProperties",
  390. },
  391. in_subarray={"allOf", "anyOf", "oneOf"},
  392. in_subvalues={
  393. "$defs",
  394. "definitions",
  395. "dependentSchemas",
  396. "patternProperties",
  397. "properties",
  398. },
  399. ),
  400. )
  401. #: JSON Schema draft 7
  402. DRAFT7 = Specification(
  403. name="draft-07",
  404. id_of=_legacy_dollar_id,
  405. subresources_of=_subresources_of_with_crazy_items_dependencies(
  406. in_value={
  407. "additionalItems",
  408. "additionalProperties",
  409. "contains",
  410. "else",
  411. "if",
  412. "not",
  413. "propertyNames",
  414. "then",
  415. },
  416. in_subarray={"allOf", "anyOf", "oneOf"},
  417. in_subvalues={"definitions", "patternProperties", "properties"},
  418. ),
  419. anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real
  420. maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
  421. in_value={
  422. "additionalItems",
  423. "additionalProperties",
  424. "contains",
  425. "else",
  426. "if",
  427. "not",
  428. "propertyNames",
  429. "then",
  430. },
  431. in_subarray={"allOf", "anyOf", "oneOf"},
  432. in_subvalues={"definitions", "patternProperties", "properties"},
  433. ),
  434. )
  435. #: JSON Schema draft 6
  436. DRAFT6 = Specification(
  437. name="draft-06",
  438. id_of=_legacy_dollar_id,
  439. subresources_of=_subresources_of_with_crazy_items_dependencies(
  440. in_value={
  441. "additionalItems",
  442. "additionalProperties",
  443. "contains",
  444. "not",
  445. "propertyNames",
  446. },
  447. in_subarray={"allOf", "anyOf", "oneOf"},
  448. in_subvalues={"definitions", "patternProperties", "properties"},
  449. ),
  450. anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real
  451. maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
  452. in_value={
  453. "additionalItems",
  454. "additionalProperties",
  455. "contains",
  456. "not",
  457. "propertyNames",
  458. },
  459. in_subarray={"allOf", "anyOf", "oneOf"},
  460. in_subvalues={"definitions", "patternProperties", "properties"},
  461. ),
  462. )
  463. #: JSON Schema draft 4
  464. DRAFT4 = Specification(
  465. name="draft-04",
  466. id_of=_legacy_id,
  467. subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
  468. in_value={"not"},
  469. in_subarray={"allOf", "anyOf", "oneOf"},
  470. in_subvalues={"definitions", "patternProperties", "properties"},
  471. ),
  472. anchors_in=_legacy_anchor_in_id,
  473. maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
  474. in_value={"additionalItems", "additionalProperties", "not"},
  475. in_subarray={"allOf", "anyOf", "oneOf"},
  476. in_subvalues={"definitions", "patternProperties", "properties"},
  477. ),
  478. )
  479. #: JSON Schema draft 3
  480. DRAFT3 = Specification(
  481. name="draft-03",
  482. id_of=_legacy_id,
  483. subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
  484. in_subarray={"extends"},
  485. in_subvalues={"definitions", "patternProperties", "properties"},
  486. ),
  487. anchors_in=_legacy_anchor_in_id,
  488. maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
  489. in_value={"additionalItems", "additionalProperties"},
  490. in_subarray={"extends"},
  491. in_subvalues={"definitions", "patternProperties", "properties"},
  492. ),
  493. )
  494. _SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
  495. { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types
  496. dialect_id: Resource.opaque(specification)
  497. for dialect_id, specification in [
  498. ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
  499. ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
  500. ("http://json-schema.org/draft-07/schema", DRAFT7),
  501. ("http://json-schema.org/draft-06/schema", DRAFT6),
  502. ("http://json-schema.org/draft-04/schema", DRAFT4),
  503. ("http://json-schema.org/draft-03/schema", DRAFT3),
  504. ]
  505. },
  506. )
  507. def specification_with(
  508. dialect_id: URI,
  509. default: Specification[Any] | _Unset = _UNSET,
  510. ) -> Specification[Any]:
  511. """
  512. Retrieve the `Specification` with the given dialect identifier.
  513. Raises:
  514. `UnknownDialect`
  515. if the given ``dialect_id`` isn't known
  516. """
  517. resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
  518. if resource is not None:
  519. return resource.contents
  520. if default is _UNSET:
  521. raise UnknownDialect(dialect_id)
  522. return default
  523. @frozen
  524. class DynamicAnchor:
  525. """
  526. Dynamic anchors, introduced in draft 2020.
  527. """
  528. name: str
  529. resource: SchemaResource
  530. def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
  531. """
  532. Resolve this anchor dynamically.
  533. """
  534. last = self.resource
  535. for uri, registry in resolver.dynamic_scope():
  536. try:
  537. anchor = registry.anchor(uri, self.name).value
  538. except exceptions.NoSuchAnchor:
  539. continue
  540. if isinstance(anchor, DynamicAnchor):
  541. last = anchor.resource
  542. return _Resolved(
  543. contents=last.contents,
  544. resolver=resolver.in_subresource(last),
  545. )
  546. def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
  547. """
  548. Recursive references (via recursive anchors), present only in draft 2019.
  549. As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive
  550. reference is supported (and is therefore assumed to be the relevant
  551. reference).
  552. """
  553. resolved = resolver.lookup("#")
  554. if isinstance(resolved.contents, Mapping) and resolved.contents.get(
  555. "$recursiveAnchor",
  556. ):
  557. for uri, _ in resolver.dynamic_scope():
  558. next_resolved = resolver.lookup(uri)
  559. if not isinstance(
  560. next_resolved.contents,
  561. Mapping,
  562. ) or not next_resolved.contents.get("$recursiveAnchor"):
  563. break
  564. resolved = next_resolved
  565. return resolved