Skip to content

Commit

Permalink
Add JSON Schema generation (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenjaminPelletier authored Jul 26, 2023
1 parent b4776a9 commit e83bd9a
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 125 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
]
dependencies = ["arrow", "pytimeparse"]
dependencies = ["arrow", "jsonschema", "pytimeparse"]
[project.optional-dependencies]
dev = ["pytest==5.0.0", "pytest-cov[all]", "black==21.10b0"]
[project.urls]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
arrow==1.2.3
jsonschema==4.17.3
pytimeparse==1.1.8
205 changes: 205 additions & 0 deletions src/implicitdict/jsonschema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import inspect
from dataclasses import dataclass
from datetime import datetime
import enum
import json
import re
from typing import get_args, get_origin, get_type_hints, Dict, Literal, Optional, Type, Union, Tuple, Callable

from . import ImplicitDict, _fullname, _get_fields, StringBasedDateTime, StringBasedTimeDelta


@dataclass
class SchemaVars(object):
name: str
"""Unique name that can be used to reference this type/schema."""

path_to: Optional[Callable[[Type, Type], str]] = None
"""Function to compute $ref path to schema describing the first type from the schema describing the second type"""

schema_id: Optional[str] = None
"""ID of the schema describing this type. Will be used to populate $schema."""

description: Optional[str] = None
"""Description of this type/schema."""


SchemaVarsResolver = Callable[[Type], SchemaVars]
"""Function producing the characteristics of a schema (SchemaVars) for a given Type."""

_implicitdict_doc = inspect.getdoc(ImplicitDict)


def make_json_schema(
schema_type: Type[ImplicitDict],
schema_vars_resolver: SchemaVarsResolver,
schema_repository: Dict[str, dict],
) -> None:
"""Create JSON Schema for the specified schema type and all dependencies.
Args:
schema_type: ImplicitDict subclass to produce JSON Schema for.
schema_vars_resolver: Mapping between Python Type and characteristics of the schema for that type.
schema_repository: Mapping from reference path (see reference_resolver) to JSON Schema for the corresponding
type. The schema for schema_type will be populated in this repository, along with all other nested types.
"""
schema_vars = schema_vars_resolver(schema_type)
if schema_vars.name in schema_repository:
return

# Add placeholder to avoid recursive definition attempts while we're making this schema
schema_repository[schema_vars.name] = {"$generating": True}

properties = {"$ref": {"type": "string", "description": "Path to content that replaces the $ref"}}
all_fields, optional_fields = _get_fields(schema_type)
required_fields = []
hints = get_type_hints(schema_type)
field_docs = _field_docs_for(schema_type)
for field in all_fields:
if field in hints:
value_type = hints[field]
else:
# See if this field has a default
if hasattr(schema_type, field):
value_type = type(getattr(schema_type, field))
else:
raise ValueError(f"Could not make JSON Schema for {_fullname(schema_type)} because field `{field}` does not have type hints nor default values")

try:
properties[field], is_optional = _schema_for(value_type, schema_vars_resolver, schema_repository, schema_type)
if not is_optional and not hasattr(schema_type, field):
required_fields.append(field)
except NotImplementedError as e:
# Simply omit fields with types that we can't describe with jsonschema
print(f"Warning: Omitting {schema_type.__name__}.{field} from definition because: {e}")
continue

if field in field_docs:
properties[field]["description"] = field_docs[field]

schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": properties
}
if schema_vars.schema_id is not None:
schema["$id"] = schema_vars.schema_id

docs = inspect.getdoc(schema_type)
if docs != _implicitdict_doc:
if schema_vars.description is not None:
schema["description"] = docs + "\n\n" + schema_vars.description
else:
schema["description"] = docs
elif schema_vars.description is not None:
schema["description"] = schema_vars.description

if required_fields:
required_fields.sort()
schema["required"] = required_fields

schema_repository[schema_vars.name] = schema


def _schema_for(value_type: Type, schema_vars_resolver: SchemaVarsResolver, schema_repository: Dict[str, dict], context: Type) -> Tuple[dict, bool]:
"""Get the JSON Schema representation of the value_type.
Args:
value_type: Data type for which to return a JSON Schema.
schema_vars_resolver: Mapping from data type to information about the schema for that data type.
schema_repository: If the schema for this data type needs to refer to schemas for other data types, those data
types must already be present in this repository or else they will be added to it during this function.
context: The parent/top-level JSON Schema in which the schema for value_type is being included (affects the $ref
paths used to refer to external schemas).
Returns:
* JSON Schema representation of the value_type
* Boolean indication of whether the value_type is optional as a field in an ImplicitDict.
E.g., _schema_for(Optional[float], ...) would indicate True because an Optional[float] field within an
ImplicitDict would be an optional field in that object.
"""
generic_type = get_origin(value_type)
if generic_type:
# Type is generic
arg_types = get_args(value_type)
if generic_type is list:
items_schema, _ = _schema_for(arg_types[0], schema_vars_resolver, schema_repository, context)
return {"type": "array", "items": items_schema}, False

elif generic_type is dict:
schema = {
"type": "object",
"properties": {
"$ref": {"type": "string", "description": "Path to content that replaces the $ref"}
}
}
if len(arg_types) >= 2:
value_schema, _ = _schema_for(arg_types[1], schema_vars_resolver, schema_repository, context)
schema["additionalProperties"] = value_schema
return schema, False

elif generic_type is Union and len(arg_types) == 2 and arg_types[1] is type(None):
# Type is an Optional declaration
subschema, _ = _schema_for(arg_types[0], schema_vars_resolver, schema_repository, context)
schema = json.loads(json.dumps(subschema))
if "type" in schema:
if "null" not in schema["type"]:
schema["type"] = [schema["type"], "null"]
else:
schema = {"oneOf": [{"type": "null"}, schema]}
return schema, True

elif generic_type is Literal and len(arg_types) == 1:
# Type is a Literal (parsed value must match specified value)
return {"type": "string", "enum": [arg_types[0]]}, False

else:
raise NotImplementedError(f"Automatic JSON schema generation for {value_type} generic type is not yet implemented")

schema_vars = schema_vars_resolver(value_type)

if issubclass(value_type, ImplicitDict):
make_json_schema(value_type, schema_vars_resolver, schema_repository)
return {"$ref": schema_vars.path_to(value_type, context)}, False

if value_type == float or issubclass(value_type, float):
return {"type": "number"}, False

if value_type == int or issubclass(value_type, int):
return {"type": "integer"}, False

if value_type == str or issubclass(value_type, str):
schema = {"type": "string"}
if issubclass(value_type, StringBasedDateTime):
schema["format"] = "date-time"
elif issubclass(value_type, StringBasedTimeDelta):
schema["format"] = "duration"
if issubclass(value_type, enum.Enum):
schema["enum"] = [v.value for v in value_type]
return schema, False

if value_type == datetime or issubclass(value_type, datetime):
return {"type": "string", "format": "date-time"}, False

if value_type == dict or issubclass(value_type, dict):
return {"type": "object"}, False

raise NotImplementedError(f"Automatic JSON schema generation for {value_type} type is not yet implemented")


def _field_docs_for(t: Type[ImplicitDict]) -> Dict[str, str]:
# Curse Guido for rejecting PEP224! Fine, we'll do it ourselves.
result = {}
src = inspect.getsource(t)
doc_pattern = r"\n\s+([_a-zA-Z][_a-zA-Z0-9]*)(?:: [^\n]+)?\n(\s+)(?:\"\"\"|''')((?:.|\s)*?)(?:\"\"\"|''')"
for m in re.finditer(doc_pattern, src):
indent = m.group(2)
lines = m.group(3).split("\n")
for i in range(1, len(lines)):
if lines[i].startswith(indent):
lines[i] = lines[i][len(indent):]
while not lines[-1]:
lines = lines[0:-1]
docstring = "\n".join(lines)
result[m.group(1)] = docstring
return result
Empty file added tests/__init__.py
Empty file.
27 changes: 2 additions & 25 deletions tests/test_containers.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,8 @@
from typing import List, Optional

from implicitdict import ImplicitDict


class MySpecialClass(str):
@property
def is_special(self) -> bool:
return True


class MyContainers(ImplicitDict):
single_value: MySpecialClass
value_list: List[MySpecialClass]
optional_list: Optional[List[MySpecialClass]]
optional_value_list: List[Optional[MySpecialClass]]
list_of_lists: List[List[MySpecialClass]]
from .test_types import ContainerData


def test_container_item_value_casting():
containers: MyContainers = ImplicitDict.parse(
{
"single_value": "foo",
"value_list": ["value1", "value2"],
"optional_list": ["bar"],
"optional_value_list": ["baz", None],
"list_of_lists": [["list1v1", "list1v2"], ["list2v1"]]
}, MyContainers)
containers: ContainerData = ContainerData.example_value()

assert containers.single_value.is_special

Expand Down
26 changes: 3 additions & 23 deletions tests/test_inheritance.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
import json
from typing import Optional

from implicitdict import ImplicitDict


class MyData(ImplicitDict):
foo: str
bar: int = 0
baz: Optional[float]
has_default_baseclass: str = "In MyData"

def hello(self) -> str:
return "MyData"

def base_method(self) -> int:
return 123


class MySubclass(MyData):
buzz: Optional[str]
has_default_subclass: str = "In MySubclass"

def hello(self) -> str:
return "MySubclass"
from .test_types import InheritanceData, MySubclass


def test_inheritance():
data: MyData = ImplicitDict.parse({'foo': 'asdf', 'bar': 1}, MyData)
data: InheritanceData = InheritanceData.example_value()
assert json.loads(json.dumps(data)) == {"foo": "asdf", "bar": 1, "has_default_baseclass": "In MyData"}
assert data.hello() == "MyData"
assert data.has_default_baseclass == "In MyData"
Expand Down Expand Up @@ -56,7 +36,7 @@ def test_inheritance():
subclass.has_default_baseclass = "In MyData 3"
subclass.has_default_subclass = "In MySubclass 3"

data2 = MyData(subclass)
data2 = InheritanceData(subclass)
assert data2.foo == "asdf"
assert data2.bar == 1
assert "baz" not in data2
Expand Down
94 changes: 94 additions & 0 deletions tests/test_jsonschema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import os.path
from typing import Type

import implicitdict.jsonschema
from implicitdict.jsonschema import SchemaVars
from implicitdict import ImplicitDict
import jsonschema

from .test_types import ContainerData, InheritanceData, NestedDefinitionsData, NormalUsageData, OptionalData, PropertiesData, SpecialTypesData


def _resolver(t: Type) -> SchemaVars:
def path_to(t_dest: Type, t_src: Type) -> str:
return "#/definitions/" + t_dest.__module__ + t_dest.__qualname__

full_name = t.__module__ + t.__qualname__

return SchemaVars(name=full_name, path_to=path_to)


def _verify_schema_validation(obj, obj_type: Type[ImplicitDict]) -> None:
repo = {}
implicitdict.jsonschema.make_json_schema(obj_type, _resolver, repo)

name = _resolver(obj_type).name
schema = repo[name]
del repo[name]
if repo:
schema["definitions"] = repo

jsonschema.Draft202012Validator.check_schema(schema)
validator = jsonschema.Draft202012Validator(schema)

pure_json = json.loads(json.dumps(obj))

error_messages = []
for e in validator.iter_errors(pure_json):
errors = [e]
while errors:
this_error = errors.pop(0)
if this_error.context:
for child in this_error.context:
errors.append(child)
else:
error_messages.append(f"[{this_error.json_path}] {this_error.message}")
assert not error_messages, "\n".join(error_messages)


def test_basic_usage():
data: NormalUsageData = ImplicitDict.parse({'foo': 'asdf', 'bar': 1}, NormalUsageData)
_verify_schema_validation(data, NormalUsageData)


def test_field_docstrings():
repo = {}
implicitdict.jsonschema.make_json_schema(NormalUsageData, _resolver, repo)
name = _resolver(NormalUsageData).name
schema = repo[name]
props = schema["properties"]

assert props["foo"]["description"] == "The foo characterizing the data."
assert props["bar"]["description"] == "The bar of the data.\n\nIndents should not be included in docstrings."
assert props["baz"]["description"] == "If this baz is specified, it provides additional information.\n\nFinal docstring newlines should be omitted."


def test_containers():
containers: ContainerData = ContainerData.example_value()
_verify_schema_validation(containers, ContainerData)


def test_inheritance():
data = InheritanceData.example_value()
_verify_schema_validation(data, InheritanceData)


def test_optional():
for data in OptionalData.example_values().values():
_verify_schema_validation(data, OptionalData)


def test_properties():
data = PropertiesData.example_value()
_verify_schema_validation(data, PropertiesData)


def test_special_types():
data = SpecialTypesData.example_value()
_verify_schema_validation(data, SpecialTypesData)


def test_nested_definitions():
data = NestedDefinitionsData.example_value()
_verify_schema_validation(data, NestedDefinitionsData)
Loading

0 comments on commit e83bd9a

Please sign in to comment.