Source code for jtd.validate

import dataclasses
import strict_rfc3339
from typing import Any, List, Optional

from .schema import Form, Schema

[docs]@dataclasses.dataclass class ValidationError: """Represents a single issue that a schema had with an instance.""" instance_path: List[str] """Path to the part of the instance that was rejected.""" schema_path: List[str] """Path to the part of the schema that did the rejecting."""
[docs]@dataclasses.dataclass class ValidationOptions: """Represents options that can be passed to :func:`validate`.""" max_depth: int = 0 """ The maximum number of refs that will be followed at once before raising :class:`MaxDepthExceededError`. A value of zero means that all refs will be followed, and :class:`MaxDepthExceededError` is never raised. Ultimately, stack overflow may cause an error instead. """ max_errors: int = 0 """ The maximum number of errors that will be returned. A value of zero means that all errors will be returned. """
[docs]class MaxDepthExceededError(Exception): """ Indicates that ref recursion depth exceeded the limit put in place by ``max_depth`` in :class:`ValidationOptions`. """ pass
[docs]def validate(**kwargs) -> List[ValidationError]: """ Performs JSON Typedef validation, and returns a list of validation errors. Provide the schema using the `schema` keyword argument, and the instance with the `instance` keyword argument. Optionally, you can pass :class:`ValidationOptions` with the `options` keyword argument. >>> import jtd >>> schema = jtd.Schema.from_dict({ 'type': 'string' }) >>> jtd.validate(schema=schema, instance="foo") [] >>> jtd.validate(schema=schema, instance=None) [ValidationError(instance_path=[], schema_path=['type'])] >>> import jtd >>> schema = jtd.Schema.from_dict({ 'elements': { 'type': 'string' }}) >>> len(jtd.validate(schema=schema, instance=[None] * 5)) 5 >>> options = jtd.ValidationOptions(max_errors=3) >>> len(jtd.validate(schema=schema, instance=[None] * 5, options=options)) 3 >>> import jtd >>> schema = { 'definitions': { 'loop': { 'ref': 'loop' }}, 'ref': 'loop' } >>> schema = jtd.Schema.from_dict(schema) >>> options = jtd.ValidationOptions(max_depth=32) >>> jtd.validate(schema=schema, instance=None, options=options) Traceback (most recent call last): ... MaxDepthExceededError """ state = _ValidationState( config=kwargs.get('options', ValidationOptions()), root_schema=kwargs['schema'], instance_tokens=[], schema_tokens=[[]], errors=[], ) try: _validate_with_state(state, kwargs['schema'], kwargs['instance'], None) except _MaxErrorsReached: pass return state.errors
@dataclasses.dataclass class _ValidationState: config: ValidationOptions root_schema: Schema instance_tokens: List[str] schema_tokens: List[List[str]] errors: List[ValidationError] def push_instance_token(self, token): self.instance_tokens.append(token) def pop_instance_token(self): self.instance_tokens.pop() def push_schema_token(self, token): self.schema_tokens[-1].append(token) def pop_schema_token(self): self.schema_tokens[-1].pop() def push_error(self): self.errors.append(ValidationError( instance_path=self.instance_tokens.copy(), schema_path=self.schema_tokens[-1].copy(), )) if len(self.errors) == self.config.max_errors: raise _MaxErrorsReached() class _MaxErrorsReached(Exception): pass def _validate_with_state(state: _ValidationState, schema: Schema, instance: Any, parent_tag: Optional[str]): if schema.nullable and instance is None: return form = schema.form() if form == form.REF: if len(state.schema_tokens) == state.config.max_depth: raise MaxDepthExceededError() state.schema_tokens.append(["definitions", schema.ref]) _validate_with_state(state, state.root_schema.definitions[schema.ref], instance, None) state.schema_tokens.pop() elif form == form.TYPE: state.push_schema_token("type") if schema.type == "boolean": if type(instance) is not bool: state.push_error() elif schema.type == "float32" or schema.type == "float64": if type(instance) not in [int, float]: state.push_error() elif schema.type == "int8": _validate_int(state, -128, 127, instance) elif schema.type == "uint8": _validate_int(state, 0, 255, instance) elif schema.type == "int16": _validate_int(state, -32768, 32767, instance) elif schema.type == "uint16": _validate_int(state, 0, 65535, instance) elif schema.type == "int32": _validate_int(state, -2147483648, 2147483647, instance) elif schema.type == "uint32": _validate_int(state, 0, 4294967295, instance) elif schema.type == "string": if type(instance) is not str: state.push_error() elif type(instance) is not str or not strict_rfc3339.validate_rfc3339(instance): state.push_error() state.pop_schema_token() elif form == form.ENUM: state.push_schema_token("enum") if instance not in schema.enum: state.push_error() state.pop_schema_token() elif form == form.ELEMENTS: state.push_schema_token("elements") if type(instance) is list: for i, v in enumerate(instance): state.push_instance_token(str(i)) _validate_with_state(state, schema.elements, v, None) state.pop_instance_token() else: state.push_error() state.pop_schema_token() elif form == form.PROPERTIES: if type(instance) is dict: state.push_schema_token("properties") for k, v in (schema.properties or {}).items(): state.push_schema_token(k) if k in instance: state.push_instance_token(k) _validate_with_state(state, v, instance[k], None) state.pop_instance_token() else: state.push_error() state.pop_schema_token() state.pop_schema_token() state.push_schema_token("optionalProperties") for k, v in (schema.optional_properties or {}).items(): state.push_schema_token(k) if k in instance: state.push_instance_token(k) _validate_with_state(state, v, instance[k], None) state.pop_instance_token() state.pop_schema_token() state.pop_schema_token() if not schema.additional_properties: for k in instance: in_props = k in (schema.properties or {}) in_opt_props = k in (schema.optional_properties or {}) if not in_props and not in_opt_props and k != parent_tag: state.push_instance_token(k) state.push_error() state.pop_instance_token() elif schema.properties is not None: state.push_schema_token("properties") state.push_error() state.pop_schema_token() else: state.push_schema_token("optionalProperties") state.push_error() state.pop_schema_token() elif form == form.VALUES: state.push_schema_token("values") if type(instance) is dict: for k, v in instance.items(): state.push_instance_token(k) _validate_with_state(state, schema.values, v, None) state.pop_instance_token() else: state.push_error() state.pop_schema_token() elif form == form.DISCRIMINATOR: if type(instance) is dict: if schema.discriminator in instance: if type(instance[schema.discriminator]) is str: if instance[schema.discriminator] in schema.mapping: sub_schema = schema.mapping[instance[schema.discriminator]] parent_tag = schema.discriminator state.push_schema_token("mapping") state.push_schema_token(instance[schema.discriminator]) _validate_with_state(state, sub_schema, instance, parent_tag) state.pop_schema_token() state.pop_schema_token() else: state.push_schema_token("mapping") state.push_instance_token(schema.discriminator) state.push_error() state.pop_instance_token() state.pop_schema_token() else: state.push_schema_token("discriminator") state.push_instance_token(schema.discriminator) state.push_error() state.pop_instance_token() state.pop_schema_token() else: state.push_schema_token("discriminator") state.push_error() state.pop_schema_token() else: state.push_schema_token("discriminator") state.push_error() state.pop_schema_token def _validate_int(state: _ValidationState, min: int, max: int, instance: Any): if type(instance) not in [int, float]: state.push_error() return if int(instance) != instance or instance < min or instance > max: state.push_error()