Skip to content

Commit

Permalink
[DOP-22348] Add transformations for Transfers with dataframe row filt…
Browse files Browse the repository at this point in the history
…ering
  • Loading branch information
Ilyas Gasanov committed Jan 15, 2025
1 parent 9b79921 commit 6e69809
Show file tree
Hide file tree
Showing 34 changed files with 1,034 additions and 121 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/184.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add transformations for **Transfers** with dataframe row filtering
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def upgrade():
sa.Column("strategy_params", sa.JSON(), nullable=False),
sa.Column("source_params", sa.JSON(), nullable=False),
sa.Column("target_params", sa.JSON(), nullable=False),
sa.Column("transformations", sa.JSON(), nullable=False),
sa.Column("is_scheduled", sa.Boolean(), nullable=False),
sa.Column("schedule", sa.String(length=32), nullable=False),
sa.Column("queue_id", sa.BigInteger(), nullable=False),
Expand Down
1 change: 1 addition & 0 deletions syncmaster/db/models/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Transfer(
strategy_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
source_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
target_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
transformations: Mapped[list[dict[str, Any]]] = mapped_column(JSON, nullable=False, default=list)
is_scheduled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
schedule: Mapped[str] = mapped_column(String(32), nullable=False, default="")
queue_id: Mapped[int] = mapped_column(
Expand Down
28 changes: 19 additions & 9 deletions syncmaster/db/repositories/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def create(
source_params: dict[str, Any],
target_params: dict[str, Any],
strategy_params: dict[str, Any],
transformations: list[dict[str, Any]],
queue_id: int,
is_scheduled: bool,
schedule: str | None,
Expand All @@ -130,6 +131,7 @@ async def create(
source_params=source_params,
target_params=target_params,
strategy_params=strategy_params,
transformations=transformations,
queue_id=queue_id,
is_scheduled=is_scheduled,
schedule=schedule or "",
Expand All @@ -154,20 +156,27 @@ async def update(
source_params: dict[str, Any],
target_params: dict[str, Any],
strategy_params: dict[str, Any],
transformations: list[dict[str, Any]],
is_scheduled: bool | None,
schedule: str | None,
new_queue_id: int | None,
) -> Transfer:
try:
for key in transfer.source_params:
if key not in source_params or source_params[key] is None:
source_params[key] = transfer.source_params[key]
for key in transfer.target_params:
if key not in target_params or target_params[key] is None:
target_params[key] = transfer.target_params[key]
for key in transfer.strategy_params:
if key not in strategy_params or strategy_params[key] is None:
strategy_params[key] = transfer.strategy_params[key]
for old, new in [
(transfer.source_params, source_params),
(transfer.target_params, target_params),
(transfer.strategy_params, strategy_params),
]:
for key in old:
if key not in new or new[key] is None:
new[key] = old[key]

new_transformations = {d["type"]: d["filters"] for d in transformations}
old_transformations = {d["type"]: d["filters"] for d in transfer.transformations}
for t_type, t_filters in new_transformations.items():
old_transformations[t_type] = t_filters
transformations = [{"type": t, "filters": f} for t, f in old_transformations.items()]

return await self._update(
Transfer.id == transfer.id,
name=name or transfer.name,
Expand All @@ -179,6 +188,7 @@ async def update(
target_connection_id=target_connection_id or transfer.target_connection_id,
source_params=source_params,
target_params=target_params,
transformations=transformations,
queue_id=new_queue_id or transfer.queue_id,
)
except IntegrityError as e:
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TransferDTO:
@dataclass
class DBTransferDTO(TransferDTO):
table_name: str
transformations: list[dict] | None = None


@dataclass
Expand All @@ -23,6 +24,7 @@ class FileTransferDTO(TransferDTO):
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
options: dict
df_schema: dict | None = None
transformations: list[dict] | None = None

_format_parsers = {
"csv": CSV,
Expand Down
4 changes: 2 additions & 2 deletions syncmaster/schemas/v1/connections/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CreateOracleConnectionDataSchema(BaseModel):
additional_params: dict = Field(default_factory=dict)

@model_validator(mode="before")
def check_owner_id(cls, values):
def validate_connection_identifiers(cls, values):
sid, service_name = values.get("sid"), values.get("service_name")
if sid and service_name:
raise ValueError("You must specify either sid or service_name but not both")
Expand All @@ -47,7 +47,7 @@ class UpdateOracleConnectionDataSchema(BaseModel):
additional_params: dict | None = Field(default_factory=dict)

@model_validator(mode="before")
def check_owner_id(cls, values):
def validate_connection_identifiers(cls, values):
sid, service_name = values.get("sid"), values.get("service_name")
if sid and service_name:
raise ValueError("You must specify either sid or service_name but not both")
Expand Down
27 changes: 25 additions & 2 deletions syncmaster/schemas/v1/transfers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from pydantic import BaseModel, Field, model_validator
from typing import Annotated

from pydantic import BaseModel, Field, field_validator, model_validator

from syncmaster.schemas.v1.connections.connection import ReadConnectionSchema
from syncmaster.schemas.v1.page import PageSchema
Expand All @@ -27,6 +29,9 @@
S3ReadTransferTarget,
)
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
DataframeRowsFilter,
)
from syncmaster.schemas.v1.types import NameConstr

ReadTransferSchemaSource = (
Expand Down Expand Up @@ -97,6 +102,8 @@
| None
)

TransformationSchema = DataframeRowsFilter


class CopyTransferSchema(BaseModel):
new_group_id: int
Expand Down Expand Up @@ -129,6 +136,9 @@ class ReadTransferSchema(BaseModel):
...,
discriminator="type",
)
transformations: list[Annotated[TransformationSchema, Field(..., discriminator="type")]] = Field(
default_factory=list,
)

class Config:
from_attributes = True
Expand Down Expand Up @@ -158,15 +168,27 @@ class CreateTransferSchema(BaseModel):
discriminator="type",
description="Incremental or archive download options",
)
transformations: list[
Annotated[TransformationSchema, Field(None, discriminator="type", description="List of transformations")]
] = Field(default_factory=list)

@model_validator(mode="before")
def check_owner_id(cls, values):
def validate_scheduling(cls, values):
is_scheduled, schedule = values.get("is_scheduled"), values.get("schedule")
if is_scheduled and schedule is None:
# TODO make checking cron string
raise ValueError("If transfer must be scheduled than set schedule param")
return values

@field_validator("transformations", mode="after")
def validate_transformations_uniqueness(cls, transformations):
if transformations:
types = [tr.type for tr in transformations]
duplicates = {t for t in types if types.count(t) > 1}
if duplicates:
raise ValueError(f"Duplicate 'type' values found in transformations: {' '.join(map(str, duplicates))}")
return transformations


class UpdateTransferSchema(BaseModel):
source_connection_id: int | None = None
Expand All @@ -179,6 +201,7 @@ class UpdateTransferSchema(BaseModel):
source_params: UpdateTransferSchemaSource = Field(discriminator="type", default=None)
target_params: UpdateTransferSchemaTarget = Field(discriminator="type", default=None)
strategy_params: FullStrategy | IncrementalStrategy | None = Field(discriminator="type", default=None)
transformations: list[Annotated[TransformationSchema, Field(discriminator="type", default=None)]] = None


class ReadFullTransferSchema(ReadTransferSchema):
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/schemas/v1/transfers/transformations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Annotated, Literal

from pydantic import BaseModel, Field

from syncmaster.schemas.v1.transformation_types import DATAFRAME_ROWS_FILTER


class BaseRowFilter(BaseModel):
field: str


class EqualFilter(BaseRowFilter):
type: Literal["equal"]
value: str | None = None


class NotEqualFilter(BaseRowFilter):
type: Literal["not_equal"]
value: str | None = None


class GreaterThanFilter(BaseRowFilter):
type: Literal["greater_than"]
value: str


class GreaterOrEqualFilter(BaseRowFilter):
type: Literal["greater_or_equal"]
value: str


class LessThanFilter(BaseRowFilter):
type: Literal["less_than"]
value: str


class LessOrEqualFilter(BaseRowFilter):
type: Literal["less_or_equal"]
value: str


class LikeFilter(BaseRowFilter):
type: Literal["like"]
value: str


class ILikeFilter(BaseRowFilter):
type: Literal["ilike"]
value: str


class NotLikeFilter(BaseRowFilter):
type: Literal["not_like"]
value: str


class NotILikeFilter(BaseRowFilter):
type: Literal["not_ilike"]
value: str


class RegexpFilter(BaseRowFilter):
type: Literal["regexp"]
value: str


RowFilter = (
EqualFilter
| NotEqualFilter
| GreaterThanFilter
| GreaterOrEqualFilter
| LessThanFilter
| LessOrEqualFilter
| LikeFilter
| ILikeFilter
| NotLikeFilter
| NotILikeFilter
| RegexpFilter
)


class DataframeRowsFilter(BaseModel):
type: DATAFRAME_ROWS_FILTER
filters: list[Annotated[RowFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
5 changes: 5 additions & 0 deletions syncmaster/schemas/v1/transformation_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
4 changes: 4 additions & 0 deletions syncmaster/server/api/v1/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async def create_transfer(
source_params=transfer_data.source_params.dict(),
target_params=transfer_data.target_params.dict(),
strategy_params=transfer_data.strategy_params.dict(),
transformations=[tr.dict() for tr in transfer_data.transformations],
queue_id=transfer_data.queue_id,
is_scheduled=transfer_data.is_scheduled,
schedule=transfer_data.schedule,
Expand Down Expand Up @@ -326,6 +327,9 @@ async def update_transfer(
source_params=transfer_data.source_params.dict() if transfer_data.source_params else {},
target_params=transfer_data.target_params.dict() if transfer_data.target_params else {},
strategy_params=transfer_data.strategy_params.dict() if transfer_data.strategy_params else {},
transformations=(
[tr.dict() for tr in transfer_data.transformations] if transfer_data.transformations else []
),
is_scheduled=transfer_data.is_scheduled,
schedule=transfer_data.schedule,
new_queue_id=transfer_data.new_queue_id,
Expand Down
2 changes: 1 addition & 1 deletion syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(
self.run = run
self.source_handler = self.get_handler(
connection_data=source_connection.data,
transfer_params=run.transfer.source_params,
transfer_params={**run.transfer.source_params, "transformations": run.transfer.transformations},
connection_auth_data=source_auth_data,
)
self.target_handler = self.get_handler(
Expand Down
44 changes: 44 additions & 0 deletions syncmaster/worker/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,47 @@ def read(self) -> DataFrame: ...

@abstractmethod
def write(self, df: DataFrame) -> None: ...


class BaseHandler(Handler):

def _apply_filters(self, df: DataFrame) -> DataFrame:
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
filter_expression = self._get_filter_expression(transformation["filters"])
if filter_expression:
df = df.where(filter_expression)
return df

@staticmethod
def _get_filter_expression(filters: list[dict]) -> str:
operators = {
"equal": "=",
"not_equal": "!=",
"greater_than": ">",
"greater_or_equal": ">=",
"less_than": "<",
"less_or_equal": "<=",
"like": "LIKE",
"ilike": "ILIKE",
"not_like": "NOT LIKE",
"not_ilike": "NOT ILIKE",
"regexp": "RLIKE",
}

expressions = []
for filter in filters:
field = filter["field"]
op = operators[filter["type"]]
value = filter["value"]

if value is None:
if op == "!=":
expressions.append(f"{field} IS NOT NULL")
elif op == "=":
expressions.append(f"{field} IS NULL")
else:
value = repr(value) if isinstance(value, str) else value
expressions.append(f"{field} {op} {value}")

return " AND ".join(expressions)
7 changes: 4 additions & 3 deletions syncmaster/worker/handlers/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from onetl.db import DBReader, DBWriter

from syncmaster.dto.transfers import DBTransferDTO
from syncmaster.worker.handlers.base import Handler
from syncmaster.worker.handlers.base import BaseHandler

if TYPE_CHECKING:
from pyspark.sql.dataframe import DataFrame


class DBHandler(Handler):
class DBHandler(BaseHandler):
connection: BaseDBConnection
transfer_dto: DBTransferDTO

Expand All @@ -25,7 +25,8 @@ def read(self) -> DataFrame:
connection=self.connection,
table=self.transfer_dto.table_name,
)
return reader.run()
df = reader.run()
return self._apply_filters(df)

def write(self, df: DataFrame) -> None:
writer = DBWriter(
Expand Down
Loading

0 comments on commit 6e69809

Please sign in to comment.