123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- from collections.abc import Mapping, MutableMapping, Sequence
- from urllib.parse import urlsplit
- import itertools
- import re
- class URIDict(MutableMapping):
- """
- Dictionary which uses normalized URIs as keys.
- """
- def normalize(self, uri):
- return urlsplit(uri).geturl()
- def __init__(self, *args, **kwargs):
- self.store = dict()
- self.store.update(*args, **kwargs)
- def __getitem__(self, uri):
- return self.store[self.normalize(uri)]
- def __setitem__(self, uri, value):
- self.store[self.normalize(uri)] = value
- def __delitem__(self, uri):
- del self.store[self.normalize(uri)]
- def __iter__(self):
- return iter(self.store)
- def __len__(self): # pragma: no cover -- untested, but to be removed
- return len(self.store)
- def __repr__(self): # pragma: no cover -- untested, but to be removed
- return repr(self.store)
- class Unset:
- """
- An as-of-yet unset attribute or unprovided default parameter.
- """
- def __repr__(self): # pragma: no cover
- return "<unset>"
- def format_as_index(container, indices):
- """
- Construct a single string containing indexing operations for the indices.
- For example for a container ``bar``, [1, 2, "foo"] -> bar[1][2]["foo"]
- Arguments:
- container (str):
- A word to use for the thing being indexed
- indices (sequence):
- The indices to format.
- """
- if not indices:
- return container
- return f"{container}[{']['.join(repr(index) for index in indices)}]"
- def find_additional_properties(instance, schema):
- """
- Return the set of additional properties for the given ``instance``.
- Weeds out properties that should have been validated by ``properties`` and
- / or ``patternProperties``.
- Assumes ``instance`` is dict-like already.
- """
- properties = schema.get("properties", {})
- patterns = "|".join(schema.get("patternProperties", {}))
- for property in instance:
- if property not in properties:
- if patterns and re.search(patterns, property):
- continue
- yield property
- def extras_msg(extras):
- """
- Create an error message for extra items or properties.
- """
- verb = "was" if len(extras) == 1 else "were"
- return ", ".join(repr(extra) for extra in extras), verb
- def ensure_list(thing):
- """
- Wrap ``thing`` in a list if it's a single str.
- Otherwise, return it unchanged.
- """
- if isinstance(thing, str):
- return [thing]
- return thing
- def _mapping_equal(one, two):
- """
- Check if two mappings are equal using the semantics of `equal`.
- """
- if len(one) != len(two):
- return False
- return all(
- key in two and equal(value, two[key])
- for key, value in one.items()
- )
- def _sequence_equal(one, two):
- """
- Check if two sequences are equal using the semantics of `equal`.
- """
- if len(one) != len(two):
- return False
- return all(equal(i, j) for i, j in zip(one, two))
- def equal(one, two):
- """
- Check if two things are equal evading some Python type hierarchy semantics.
- Specifically in JSON Schema, evade `bool` inheriting from `int`,
- recursing into sequences to do the same.
- """
- if one is two:
- return True
- if isinstance(one, str) or isinstance(two, str):
- return one == two
- if isinstance(one, Sequence) and isinstance(two, Sequence):
- return _sequence_equal(one, two)
- if isinstance(one, Mapping) and isinstance(two, Mapping):
- return _mapping_equal(one, two)
- return unbool(one) == unbool(two)
- def unbool(element, true=object(), false=object()):
- """
- A hack to make True and 1 and False and 0 unique for ``uniq``.
- """
- if element is True:
- return true
- elif element is False:
- return false
- return element
- def uniq(container):
- """
- Check if all of a container's elements are unique.
- Tries to rely on the container being recursively sortable, or otherwise
- falls back on (slow) brute force.
- """
- try:
- sort = sorted(unbool(i) for i in container)
- sliced = itertools.islice(sort, 1, None)
- for i, j in zip(sort, sliced):
- if equal(i, j):
- return False
- except (NotImplementedError, TypeError):
- seen = []
- for e in container:
- e = unbool(e)
- for i in seen:
- if equal(i, e):
- return False
- seen.append(e)
- return True
- def find_evaluated_item_indexes_by_schema(validator, instance, schema):
- """
- Get all indexes of items that get evaluated under the current schema.
- Covers all keywords related to unevaluatedItems: items, prefixItems, if,
- then, else, contains, unevaluatedItems, allOf, oneOf, anyOf
- """
- if validator.is_type(schema, "boolean"):
- return []
- evaluated_indexes = []
- if "items" in schema:
- return list(range(len(instance)))
- ref = schema.get("$ref")
- if ref is not None:
- resolved = validator._resolver.lookup(ref)
- evaluated_indexes.extend(
- find_evaluated_item_indexes_by_schema(
- validator.evolve(
- schema=resolved.contents,
- _resolver=resolved.resolver,
- ),
- instance,
- resolved.contents,
- ),
- )
- dynamicRef = schema.get("$dynamicRef")
- if dynamicRef is not None:
- resolved = validator._resolver.lookup(dynamicRef)
- evaluated_indexes.extend(
- find_evaluated_item_indexes_by_schema(
- validator.evolve(
- schema=resolved.contents,
- _resolver=resolved.resolver,
- ),
- instance,
- resolved.contents,
- ),
- )
- if "prefixItems" in schema:
- evaluated_indexes += list(range(len(schema["prefixItems"])))
- if "if" in schema:
- if validator.evolve(schema=schema["if"]).is_valid(instance):
- evaluated_indexes += find_evaluated_item_indexes_by_schema(
- validator, instance, schema["if"],
- )
- if "then" in schema:
- evaluated_indexes += find_evaluated_item_indexes_by_schema(
- validator, instance, schema["then"],
- )
- elif "else" in schema:
- evaluated_indexes += find_evaluated_item_indexes_by_schema(
- validator, instance, schema["else"],
- )
- for keyword in ["contains", "unevaluatedItems"]:
- if keyword in schema:
- for k, v in enumerate(instance):
- if validator.evolve(schema=schema[keyword]).is_valid(v):
- evaluated_indexes.append(k)
- for keyword in ["allOf", "oneOf", "anyOf"]:
- if keyword in schema:
- for subschema in schema[keyword]:
- errs = next(validator.descend(instance, subschema), None)
- if errs is None:
- evaluated_indexes += find_evaluated_item_indexes_by_schema(
- validator, instance, subschema,
- )
- return evaluated_indexes
- def find_evaluated_property_keys_by_schema(validator, instance, schema):
- """
- Get all keys of items that get evaluated under the current schema.
- Covers all keywords related to unevaluatedProperties: properties,
- additionalProperties, unevaluatedProperties, patternProperties,
- dependentSchemas, allOf, oneOf, anyOf, if, then, else
- """
- if validator.is_type(schema, "boolean"):
- return []
- evaluated_keys = []
- ref = schema.get("$ref")
- if ref is not None:
- resolved = validator._resolver.lookup(ref)
- evaluated_keys.extend(
- find_evaluated_property_keys_by_schema(
- validator.evolve(
- schema=resolved.contents,
- _resolver=resolved.resolver,
- ),
- instance,
- resolved.contents,
- ),
- )
- dynamicRef = schema.get("$dynamicRef")
- if dynamicRef is not None:
- resolved = validator._resolver.lookup(dynamicRef)
- evaluated_keys.extend(
- find_evaluated_property_keys_by_schema(
- validator.evolve(
- schema=resolved.contents,
- _resolver=resolved.resolver,
- ),
- instance,
- resolved.contents,
- ),
- )
- for keyword in [
- "properties", "additionalProperties", "unevaluatedProperties",
- ]:
- if keyword in schema:
- schema_value = schema[keyword]
- if validator.is_type(schema_value, "boolean") and schema_value:
- evaluated_keys += instance.keys()
- elif validator.is_type(schema_value, "object"):
- for property in schema_value:
- if property in instance:
- evaluated_keys.append(property)
- if "patternProperties" in schema:
- for property in instance:
- for pattern in schema["patternProperties"]:
- if re.search(pattern, property):
- evaluated_keys.append(property)
- if "dependentSchemas" in schema:
- for property, subschema in schema["dependentSchemas"].items():
- if property not in instance:
- continue
- evaluated_keys += find_evaluated_property_keys_by_schema(
- validator, instance, subschema,
- )
- for keyword in ["allOf", "oneOf", "anyOf"]:
- if keyword in schema:
- for subschema in schema[keyword]:
- errs = next(validator.descend(instance, subschema), None)
- if errs is None:
- evaluated_keys += find_evaluated_property_keys_by_schema(
- validator, instance, subschema,
- )
- if "if" in schema:
- if validator.evolve(schema=schema["if"]).is_valid(instance):
- evaluated_keys += find_evaluated_property_keys_by_schema(
- validator, instance, schema["if"],
- )
- if "then" in schema:
- evaluated_keys += find_evaluated_property_keys_by_schema(
- validator, instance, schema["then"],
- )
- elif "else" in schema:
- evaluated_keys += find_evaluated_property_keys_by_schema(
- validator, instance, schema["else"],
- )
- return evaluated_keys
|