-
-
Notifications
You must be signed in to change notification settings - Fork 666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JSON Fields for Nested Pydantic Models? #63
Comments
I also wondered how to store JSON objects without converting to string. SQL Alchemy supports storing these directly |
@OXERY && @scuervo91 - I was able to get something that works Using this:
That said: this is a postgresql JSONB column in my database. But it works. For a nested Object you could use a pydantic model as the Type and do it the same way. Hope this helps as I was having a difficult time figuring out a solution as well :) |
I also got it working, on SQLite and Postgresql: |
@TheJedinator Could you help a bit more with the nested object? I tried to "use the pydantic model as the Type" but I can't get it to work :( Here is my snippet: from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field
from sqlmodel import Session
from sqlmodel import SQLModel
from engine import get_sqlalchemy_engine
class J(SQLModel):
j: int
class A(SQLModel, table=True):
a: int = Field(primary_key=True)
b: J = Field(sa_column=Column(JSONB))
engine = get_sqlalchemy_engine()
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
a = A(a=1, b=J(j=1))
session.add(a)
session.commit()
session.refresh(a) Throws an error
|
This should resolve your issue in preparing the object for the database. What I'm seeing in the error is that the Raw Object is being included in the statement rather than the instance... If this doesn't help I can definitely put some more time in to looking at what's going on. |
Thank you! Unfortunately I get the same error :( I found one workaround - registering a custom_serializer for the sqlalchemy engine, like so: def custom_serializer(d):
return json.dumps(d, default=lambda v: v.json())
def get_sqlalchemy_engine():
return create_engine("postgresql+psycopg2://", creator=get_conn, json_serializer=custom_serializer) But if there is a cleaner way, I would gladly use that instead. |
Hey @psarka I just actually tried what I told and sorry have mislead... I did get a working solution though 😄 It was actually the opposite function that you need to use, here's the example you supplied with the amendments to make it work: with Session(engine) as session:
j = J(j=1)
j_dumped = J.json(j)
a = A(a=1, b=j_dumped)
session.add(a)
session.commit()
session.refresh(a) |
Hmm, this doesn't (or at least shouldn't) typecheck :) But I see what you did there, essentially it's the same as registring a custom serializer, but manually. |
It does type check when you create the This allows for the type checking of the object, the It is essentially the same as registering a custom serializer but allows you to be explicit about using it. |
A hacky method with type checking that work with sqlite is from sqlalchemy import Column
from typing import List
# from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field
from sqlmodel import Session
from pydantic import validator
from sqlmodel import SQLModel, JSON,create_engine
# from engine import get_sqlalchemy_engine
sqlite_file_name = "test.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url)
class J2(SQLModel):
test: List[int]
class J(SQLModel):
j: int
nested: J2
class A(SQLModel, table=True):
a: int = Field(primary_key=True)
b: J = Field(sa_column=Column(JSON))
@validator('b')
def val_b(cls, val):
return val.dict()
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
a = A(a=1, b=J(j=1,nested=J2(test=[100,100,100])))
session.add(a)
session.commit()
session.refresh(a) |
hi, from sqlmodel import SQLModel,Relationship,Field,JSON
from typing import Optional,List, Dict
from sqlalchemy import Column
from pydantic import validator
#
class J2(SQLModel):
id: int
title:str
#
class Companies(SQLModel, table=True):
id:Optional[int]=Field(default=None,primary_key=True)
name:str
adddresses: List['J2'] = Field(sa_column=Column(JSON))
@validator('adddresses')
def val_b(cls, val):
print(val)
return val.dict() Given error. TypeError: Type is not JSON serializable: J2 when i print it, it returns [J2(id=1, title='address1'), J2(id=2, title='address2')] how can i handle that? Why is this J2 added, how can I get rid of it, i can't turn it to .dict(), i cannot serialise it... can you give an idea? |
Does this work?
|
@HenningScheufler thank you for your help, it worked perfect. |
Hey all, thanks for the great advice here. Creating a the object using the classes and writing them to the DB works as expected and writes the data as a dict into a JSON field. See this example:
However, when reading the model from the DB using a
Any hint how that could be achieved? Maybe via the custom-serialiser mentioned by @psarka ? Thanks already! |
Something like this works, but obviously doesn't scale if we have mulitple nested models, instead of just the
Instead, we would need more context in the deserialiser (i.e. access to the type-hint of the field we're trying to deserialise so that we can use Any hint where and how I could achieve that kind of access to the deserialisation process? Thanks :) |
Hey all, after looking at this again, I've been able to resolve it as follows. For our sqlalchemy models we created this PydanticJSONType factory: def pydantic_column_type(pydantic_type):
class PydanticJSONType(TypeDecorator, Generic[T]):
impl = JSON()
def __init__(
self, json_encoder=json,
):
self.json_encoder = json_encoder
super(PydanticJSONType, self).__init__()
def bind_processor(self, dialect):
impl_processor = self.impl.bind_processor(dialect)
dumps = self.json_encoder.dumps
if impl_processor:
def process(value: T):
if value is not None:
if isinstance(pydantic_type, ModelMetaclass):
# This allows to assign non-InDB models and if they're
# compatible, they're directly parsed into the InDB
# representation, thus hiding the implementation in the
# background. However, the InDB model will still be returned
value_to_dump = pydantic_type.from_orm(value)
else:
value_to_dump = value
value = recursive_custom_encoder(value_to_dump)
return impl_processor(value)
else:
def process(value):
if isinstance(pydantic_type, ModelMetaclass):
# This allows to assign non-InDB models and if they're
# compatible, they're directly parsed into the InDB
# representation, thus hiding the implementation in the
# background. However, the InDB model will still be returned
value_to_dump = pydantic_type.from_orm(value)
else:
value_to_dump = value
value = dumps(recursive_custom_encoder(value_to_dump))
return value
return process
def result_processor(self, dialect, coltype) -> T:
impl_processor = self.impl.result_processor(dialect, coltype)
if impl_processor:
def process(value):
value = impl_processor(value)
if value is None:
return None
data = value
# Explicitly use the generic directly, not type(T)
full_obj = parse_obj_as(pydantic_type, data)
return full_obj
else:
def process(value):
if value is None:
return None
# Explicitly use the generic directly, not type(T)
full_obj = parse_obj_as(pydantic_type, value)
return full_obj
return process
def compare_values(self, x, y):
return x == y
return PydanticJSONType where Using this in SQLModel as follows: class ConnectionResistances(SQLConnectionModel, table=False):
very_short: ResistancesInLoadDuration = ResistancesInLoadDuration()
short: ResistancesInLoadDuration = ResistancesInLoadDuration()
middle: ResistancesInLoadDuration = ResistancesInLoadDuration()
long: ResistancesInLoadDuration = ResistancesInLoadDuration()
constant: ResistancesInLoadDuration = ResistancesInLoadDuration()
earth_quake: ResistancesInLoadDuration = ResistancesInLoadDuration()
class Connection(SQLConnectionModel, table=True):
id: Optional[uuid.UUID] = Field(default=None, sa_column=Column(PGUUID(as_uuid=True), default=uuid.uuid4, primary_key=True))
name: str
comment: str
path_to_pdf: Optional[str] = None
resistance_values: ConnectionResistances = Field(..., sa_column=Column(pydantic_column_type(ConnectionResistances)))
Works perfectly!
This could be integrated into an sqlmodel api based on the type hint alone (i.e. creating the sa_column based on the pydantic type automatically). Potentially in What do you think, @tiangolo? |
@tiangolo Any updates ? |
Hey @MaximilianFranz Would you mind sharing your entire solution, I am quite interested in trying it out, but it is missing some code pieces. |
What exactly are you missing? Happy to provide more context! |
The |
You can use from fastapi.encoders import jsonable_encoder also I would start with a simpler model like: class NestedModel(SQLModel):
some_value: str
class OuterModel(SQLModel, table=True):
guid: str = Field(
default=None,
sa_column=Column(PGUUID(as_uuid=True), default=uuid.uuid4, primary_key=True),
)
nested: NestedModel = Field(..., sa_column=Column(pydantic_column_type(NestedModel))) That should work! |
Thanks, @MaximilianFranz Let me try. My code is here: https://github.com/Lightning-AI/lightning-hpo/blob/master/lightning_hpo/commands/sweep.py#L36. Trying to store the Sweep distributions. |
Hey @MaximilianFranz I have made a draft PR there: https://github.com/Lightning-AI/lightning-hpo/pull/19/files. I tried but it is raising an error. Would you mind having a look? Best, |
Both from pydantic import parse_obj_as
from pydantic.main import ModelMetaclass As for the error, would you mind pointing me to the action that fails or post a traceback somewhere? |
It makes sense that it doesn't work yet. You'll have to use the |
Hey @MaximilianFranz, I updated the code with your inputs, but it is still failing. I pushed the updated code. File "/Users/thomas/Documents/GitHub/LAI-lightning-hpo-App/lightning_hpo/components/servers/db/server.py", line 42, in insert_sweep
session.commit()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1451, in commit
self._transaction.commit(_to_root=self.future)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit
self._prepare_impl()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
self.session.flush()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3383, in flush
self._flush(objects)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3523, in _flush
transaction.rollback(_capture_exception=True)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush
flush_context.execute()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
_emit_insert_statements(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
result = connection._execute_20(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
return connection._execute_clauseelement(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 5 - probably unsupported type.
[SQL: INSERT INTO sweepconfig (distributions, sweep_id, script_path, n_trials, simultaneous_trials, requirements, script_args, framework, cloud_compute, num_nodes, logger, direction) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('{"name": "model.lr", "distribution": "uniform", "params": {"params": {"low": "0.001", "high": "0.1"}}}', 'thomas-5e0dd935', 'train.py', 1, 1, [], [], 'pytorch_lightning', 'cpu', 1, 'wandb', 'maximize')]
(Background on this error at: https://sqlalche.me/e/14/rvf5) |
To finish this, the problem ended up being a attribute of type |
Hoping this gets merged in the near future. |
I was able to setup from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field, SQLModel, Column
class test(SQLModel, table=True):
result: dict = Field(sa_column=Column(JSONB), default={"message":"hello world"}) |
Would be great to have this in-built! CC: @tiangolo , @MaximilianFranz |
I arrived here looking at something that I imagined might not work yet but that seems to be a feature one would expect from a library like this. The use case is simple (the solution I am not sure haha). If you have a model with a field that is typed as a pedantic model, it seems to me that as long as you specify the column being a JSON type serialization and deserialiation should happen automatically from the database. class MyOtherModel(BaseModel):
name: str
id: int
class MyModel(SQLModel, table=True):
id = int](id: int | None = Field(default=None, primary_key=True))
my_field: MyOtherModel = Field(sa_column=Collumn(type_=JSON)
# ...setup session...
model = session.exec(select(MyModel).where(MyModel.id == 1)).first()
print(model.my_field.name) # Should work
model.my_field = MyOtherModel(name="New name", id=123)
session.flush() # Should work Is this something that is being considered? |
In case this helps anyone, here is how I managed to achieve it, although depending on how many of these fields you have it might impact performance a bit. I have a model named class Meetup(SQLModel, table=True):
__tablename__: str = "meetups" # type: ignore
# ... some fields definitions ...
location: MeetupLocation | None = Field(
default=None,
sa_column=Column(type_=MeetupLocation.as_mutable(JSON(none_as_null=True)), nullable=True),
) I will talk about how To serialize and deserialize, I leveraged the engine using the def serialize_pydantic_model(model: BaseModel) -> str:
return model.model_dump_json()
def deserialize_pydantic_model(data: str) -> BaseModel | None:
# Try deserializing with each model until one works.
# This is a pretty ugly solution but the deserialization seems to only be possible and reliable at an engine level
# and we need to know the model to deserialize it properly
# We would need to keep adding more of these if we add more models with JSON fields.
with suppress(ValidationError):
return MeetupLocation.model_validate_json(data)
return None
engine = create_engine(
db_config.full_url,
echo=db_config.engine_echo,
json_serializer=serialize_pydantic_model,
json_deserializer=deserialize_pydantic_model,
) The key part here is that SQLModel seems to be breaking mutable tracking by SQLAlchemy, so when you later do something like meetup.location.name = "Other name"
session.add(meetup)
session.commit() It will not be persisted in the db because, as far as the To fix this, I have created a from pydantic import BaseModel
from sqlalchemy.ext.mutable import Mutable
class MutableModel(BaseModel, Mutable):
def __setattr__(self, name: str, value: Any) -> None:
"""Allows SQLAlchmey Session to track mutable behavior when updating any field"""
self.changed()
return super().__setattr__(name, value)
@classmethod
def coerce(cls, key: str, value: Any) -> Self | None:
"""Convert JSON to MeetupLocation object allowing for mutable behavior"""
if isinstance(value, cls) or value is None:
return value
if isinstance(value, str):
return cls.model_validate_json(value)
if isinstance(value, dict):
return cls(**value)
super().coerce(key, value) Then, class MeetupLocation(MutableModel):
name: str | None = None
location: tuple[float, float] | None = None And this works as far as I have been able to validate it. It can read json and transform them into the we want and it can update the object in the database if anything changes in the model later on. I hope this helps. |
a little question, why not use Otherwise, when I insert data into DB, the nested field is already a dict type. new_row = {"name": "aa", "location": {"address": "bb", "city": "cc"}}
new_row_db = Meetup.model_validate(new_row )
# Here, new_row_db.location is a dict type.
session.add(new_row_db) The serializer is a must have otherwise I cannot add the new row into DB, but the deserializer is optional for me, as I'm fine with |
In my very personl senario, my json data are in format like {
"object": "array",
"data": [
1,
2
]
} Every json object must have two keys
|
I was able to solve this issue using the json-fix library. You only need to add the following method to your nested models. def __json__(self):
return self.model_dump() I hope this can be done without an external library. |
This is my approach using a conventional SQLAlchemy approach. This is actually how others have done this with SA and Pydantic, for instance in this discussion: sqlalchemy/sqlalchemy#11050 from typing import Any, Self
from pydantic import BaseModel as _BaseModel
from sqlalchemy import JSON, types, Column
from sqlalchemy.ext.mutable import Mutable
from sqlmodel import SQLModel, Field
class JsonPydanticField(types.TypeDecorator):
impl = JSON
def __init__(self, pydantic_model):
super().__init__()
self.pydantic_model = pydantic_model
def load_dialect_impl(self, dialect):
return dialect.type_descriptor(JSON())
def process_bind_param(self, value: _BaseModel, _):
return value.model_dump() if value is not None else None
def process_result_value(self, value, _):
return self.pydantic_model.model_validate(value) if value is not None else None
class MutableSABaseModel(_BaseModel, Mutable):
def __setattr__(self, name: str, value: Any) -> None:
"""Allows SQLAlchmey Session to track mutable behavior"""
self.changed()
return super().__setattr__(name, value)
@classmethod
def coerce(cls, key: str, value: Any) -> Self | None:
"""Convert JSON to pydantic model object allowing for mutable behavior"""
if isinstance(value, cls) or value is None:
return value
if isinstance(value, str):
return cls.model_validate_json(value)
if isinstance(value, dict):
return cls(**value)
return super().coerce(key, value)
@classmethod
def to_sa_type(cls):
return cls.as_mutable(JsonPydanticField(cls))
class Nested(MutableSABaseModel):
a: str
b: str | None = None
NestedSAType = Nested.to_sa_type()
class DBModel(SQLModel, table=True):
id: str = Field(primary_key=True)
nested: Nested = Field(sa_column=Column(NestedSAType))
|
Hey there, |
I had a similar problem with a list of a nested pydantic model: from sqlmodel import JSON, Column, Field, SQLModel, create_engine
class MyNestedModel(SQLModel):
a: str
b: str | None
class MyModel(SQLModel):
c: list[MyNestedModel] | None = Field(
default=None, sa_column=Column(JSON)
) i solved passing to the def serialize_pydantic_model(model: BaseModel | list[BaseModel] | None) -> str | None:
if isinstance(model, BaseModel):
return model.model_dump_json()
if isinstance(model, list):
return json.dumps([m.model_dump_json() for m in model])
return model I know is not an elegant solution but was enough to make it work. |
Do you have the complete code @dadodimauro ? I'm trying to use it. |
I HAVE SOLVED! I have created a List() field using jsonable_encoder here is my snipped code. SOLUTIONfrom fastapi.encoders import jsonable_encoder
from sqlmodel import Field, Session, SQLModel, create_engine, select, JSON, Column
class SKU(SQLModel):
sku_item_number: str
quantity: int
sku_price: Decimal
class SalesBase(SQLModel):
business_id: str = Field(index=True, description='Unique identifier for the business')
date: datetime = Field(description='Date of the sales transaction | Format: YYYY-MM-DD')
sale_amount: Decimal = Field(description='Total amount of sales in a given transaction')
sale_count: int = Field(description='The number of sale transactions')
reversal_amount: Optional[Decimal] = Field(description='Total amount of reversals (refunds) within the transaction')
reversal_count: Optional[int] = Field(description='The number of reversals within the transaction')
currency: str = Field(description='The currency of the transaction (e.g., USD, EUR)', default='USD')
skus: list[SKU] | None = Field(
default=None,
sa_column=Column(JSON)
)
# Needed for Column(JSON)
class Config:
arbitrary_types_allowed = True
class Tb_Sales(SalesBase, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
@router.post("/add")
async def add(item: SalesBase):
tb_sales = Tb_Sales()
with Session(engine) as session:
# tb_sales
tb_sales.business_id = item.business_id
tb_sales.date = item.date
tb_sales.sale_amount = item.sale_amount
tb_sales.sale_count = item.sale_count
tb_sales.reversal_amount = item.reversal_amount
tb_sales.reversal_count = item.reversal_count
tb_sales.currency = str(item.currency).upper()
tb_sales.skus = jsonable_encoder(item.skus)
session.add(tb_sales)
# commit
session.commit()
session.refresh(tb_sales)
out = {
"message": "item created!",
"id": tb_sales.id,
"details": tb_sales
}
return out Output data from data saved on Postgres[
{
"sale_count": 10,
"business_id": "999",
"reversal_count": 10,
"skus": null,
"reversal_amount": "10",
"date": "2024-10-31T01:42:58.469000",
"sale_amount": "10",
"currency": "USD",
"id": "f10a3faf-ce80-46ea-a07c-ef7787288ad9"
},
{
"sale_count": 10,
"business_id": "999",
"reversal_count": 10,
"skus": [
{
"sku_item_number": "111",
"quantity": 10,
"sku_price": "10"
}
],
"reversal_amount": "10",
"date": "2024-10-31T01:42:58.469000",
"sale_amount": "10",
"currency": "USD",
"id": "2b4a04fd-8703-4e1b-a6cf-1c6b2a91d3cc"
},
{
"sale_count": 20,
"business_id": "string",
"reversal_count": 20,
"skus": [
{
"sku_item_number": "999",
"quantity": 20,
"sku_price": "20"
}
],
"reversal_amount": "20",
"date": "2024-10-31T02:04:09.032000",
"sale_amount": "20",
"currency": "USD",
"id": "0b76d570-83aa-48e2-8267-d39549da5fc5"
},
{
"sale_count": 0,
"business_id": "string",
"reversal_count": 0,
"skus": [
{
"sku_item_number": "string",
"quantity": 0,
"sku_price": "0"
}
],
"reversal_amount": "0",
"date": "2024-10-31T02:06:38.215000",
"sale_amount": "0",
"currency": "USD",
"id": "f9ae39a1-3bed-4af8-b1ed-8501d41ffa32"
},
{
"sale_count": 0,
"business_id": "string",
"reversal_count": 0,
"skus": [
{
"sku_item_number": "1",
"quantity": 10,
"sku_price": "10"
},
{
"sku_item_number": "2",
"quantity": 20,
"sku_price": "20"
},
{
"sku_item_number": "3",
"quantity": 30,
"sku_price": "30"
}
],
"reversal_amount": "0",
"date": "2024-10-31T02:06:38.215000",
"sale_amount": "0",
"currency": "USD",
"id": "f1c59936-0a8a-4134-a642-4901e75d563c"
}
] |
Hi! I experienced a similar issue, so I had to use @alexdashly 's solution. The first issue i experienced with the solution was that the resulting data in the database was stored as a json string even when the model that was dumped was an array which limited me from some types of queries such as fetching from the database a bunch of items based on the value in a json field, so, i had to modify it to class JSONBPydanticField(types.TypeDecorator):
"""This is a custom SQLAlchemy field that allows easy serialization between database JSONB types and Pydantic models"""
impl = JSONB
def __init__(
self,
pydantic_model_class: type["MutableSABaseModel"],
many: bool = False,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.pydantic_model_class = pydantic_model_class
self.many = many
def load_dialect_impl(self, dialect):
return dialect.type_descriptor(JSONB())
def process_bind_param(self, value: _BaseModel | list[_BaseModel], dialect):
"""Convert python native type to JSON string before storing in the database"""
return jsonable_encoder(value) if value else None
def process_result_value(self, value, dialect):
"""Convert JSON string back to Python object after retrieving from the database"""
if self.many:
return (
[self.pydantic_model_class.model_validate(v) for v in value]
if value
else None
)
return (
self.pydantic_model_class.model_validate(value)
if value is not None
else None
)
class MutableSAList(list, Mutable):
"""This is a hack that is intended to allow SQLAlchemy detect changes in JSON field that is a list in native python
Allows SQLAlchmey Session to track mutable behavior"""
@override
def append(self, __object):
self.changed()
super().append(__object)
@override
def remove(self, __value):
self.changed()
super().remove(__value)
@override
def pop(self, __index=-1):
self.changed()
super().pop(__index)
@override
def reverse(self):
self.changed()
super().reverse()
@override
def __setattr__(self, name: str, value: Any) -> None:
self.changed()
super().__setattr__(name, value)
@override
def __setitem__(self, key, value):
self.changed()
super().__setitem__(key, value)
@override
def __delitem__(self, key):
self.changed()
super().__delitem__(key)
def __iadd__(self, other):
self.changed()
super().__iadd__(other)
class MutableSABaseModel(_BaseModel, Mutable):
"""This is a hack that is intended to allow SQLAlchemy detect changes in JSON field that is a pydantic model"""
def __setattr__(self, name: str, value: Any) -> None:
"""Allows SQLAlchmey Session to track mutable behavior"""
self.changed()
return super().__setattr__(name, value)
@classmethod
def coerce(cls, key: str, value: Any) -> Self | None:
"""Convert JSON to pydantic model object allowing for mutable behavior"""
if isinstance(value, cls) or value is None:
return value
if isinstance(value, str):
return cls.model_validate_json(value)
if isinstance(value, dict):
return cls.model_validate(value)
if isinstance(value, list):
return MutableSAList([cls.model_validate(v) for v in value])
return super().coerce(key, value)
@classmethod
def to_sa_type(cls, many=False):
return cls.as_mutable(JSONBPydanticField(pydantic_model_class=cls, many=many))
class BaseDBModel(SQLModel):
id: UUID = Field(default_factory=uuid4, primary_key=True)
created_at: AwareDatetime = Field(
default_factory=aware_datetime_now, sa_type=TIMESTAMP(timezone=True)
)
last_updated_at: AwareDatetime | None = Field(sa_type=TIMESTAMP(timezone=True))
objects: ClassVar[BaseModelManager] = BaseModelManager()
class OrganizationMemberPermission(str, Enum):
MANAGE_EVENTS = "EVENT:WRITE"
INVITE_MEMBERS = "MEMBERS:INVITE"
APPROVE_REQUESTS = "MEMBERS:APPROVE_REQUEST"
class OrganizationMember(MutableSABaseModel):
id: UUID
role: str
permissions: list[OrganizationMemberPermission] = Field(
description="A list of administrative features an organization member can perform in the organization"
)
OrganizationMembersSAType = OrganizationMember.to_sa_type(many=True)
class Organization(BaseDBModel, table=True):
__tablename__ = "organizations"
name: str = Field(max_length=128, unique=True)
is_verified: bool = Field(
False,
description="used to flag organizations that has been verified by eventtrakka",
)
logo_url: str | None = Field(None)
about: str | None
owner_id: UUID = Field(foreign_key="users.id")
owner: "User" = Relationship()
members: list[OrganizationMember] = Field(
default_factory=list,
sa_type=OrganizationMembersSAType,
)
objects: ClassVar[OrganizationModelManager["Organization"]] = (
OrganizationModelManager()
) The async def get_organizations_as_member(
self,
member: "User",
session: AsyncSession | None = None,
) -> list[T]:
async for s in get_db_session():
session = s or session
query = (
select(self.model_class)
.select_from(self.model_class)
.join(
func.jsonb_array_elements(self.model_class.members).alias(
"members_jsonb"
),
text("true"), # LATERAL join
)
.where(
func.jsonb_extract_path_text(column("members_jsonb"), "id")
== str(member.id)
)
)
return await paginate(session, query) My current limitation is the way alembic auto generates the migrations, so i have to modify the migrations files. # from this
sa.Column(
"members",
JSONBPydanticField(
astext_type=Text(),
),
nullable=False,
),
# to this
sa.Column(
"members",
JSONBPydanticField(
pydantic_model_class=OrganizationMember,
many=True,
astext_type=sa.Text(),
),
nullable=False,
), The entire code is at https://github.com/OSCA-Ado-Ekiti/EventTrakka-Backend . |
You can omit re-define the MutableList by using |
First Check
Commit to Help
Example Code
Description
I have already implemented an API using FastAPI to store Pydantic Models. These models are themselves nested Pydantic models so the way they interact with a Postgres DataBase is throught JsonField. I've been using Tortoise ORM as the example shows.
Is there an equivalent model in SQLModel?
Operating System
Linux
Operating System Details
WSL 2 Ubuntu 20.04
SQLModel Version
0.0.4
Python Version
3.8
Additional Context
No response
The text was updated successfully, but these errors were encountered: