Skip to content

Commit

Permalink
add Python udf example (#42)
Browse files Browse the repository at this point in the history
* include datafusion-python as sub module

* copy datafusion-python code in

* find/replace datafusion._internal -> denormalized._internal.datafusion

* update imports to make things work

* add example udf

* fix cargo

* rename example file

* update cargo lock
  • Loading branch information
emgeee authored Sep 23, 2024
1 parent 6d3ec66 commit e9b6e3f
Show file tree
Hide file tree
Showing 23 changed files with 5,906 additions and 57 deletions.
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"]
# Enable docstring linting using the google style guide
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "D", "W"]
ignore = ["D103"]

[tool.ruff.lint.pydocstyle]
convention = "google"
Expand Down
3 changes: 1 addition & 2 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from denormalized._internal import PyContext

from denormalized.datastream import DataStream as DataStream
from .datastream import DataStream

class Context:
"""Context."""
Expand Down
112 changes: 112 additions & 0 deletions py-denormalized/python/denormalized/datafusion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""DataFusion python package.
This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.
See https://datafusion.apache.org/python for more information.
"""

try:
import importlib.metadata as importlib_metadata
except ImportError:
import importlib_metadata

from .context import (
SessionContext,
SessionConfig,
RuntimeConfig,
SQLOptions,
)

from .catalog import Catalog, Database, Table

# The following imports are okay to remain as opaque to the user.
from denormalized._internal import Config, LogicalPlan, ExecutionPlan, runtime

from .record_batch import RecordBatchStream, RecordBatch

from .udf import ScalarUDF, AggregateUDF, Accumulator

from .common import (
DFSchema,
)

from .dataframe import DataFrame

from .expr import (
Expr,
WindowFrame,
)

from . import functions, object_store

__all__ = [
"Accumulator",
"Config",
"DataFrame",
"SessionContext",
"SessionConfig",
"SQLOptions",
"RuntimeConfig",
"Expr",
"ScalarUDF",
"WindowFrame",
"column",
"col",
"literal",
"lit",
"DFSchema",
"runtime",
"Catalog",
"Database",
"Table",
"AggregateUDF",
"LogicalPlan",
"ExecutionPlan",
"RecordBatch",
"RecordBatchStream",
"common",
"expr",
"functions",
"object_store",
]


def column(value: str):
"""Create a column expression."""
return Expr.column(value)


def col(value: str):
"""Create a column expression."""
return Expr.column(value)


def literal(value):
"""Create a literal expression."""
return Expr.literal(value)


def lit(value):
"""Create a literal expression."""
return Expr.literal(value)


udf = ScalarUDF.udf

udaf = AggregateUDF.udaf
76 changes: 76 additions & 0 deletions py-denormalized/python/denormalized/datafusion/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Data catalog providers."""

from __future__ import annotations

import denormalized._internal as df_internal

from typing import TYPE_CHECKING

if TYPE_CHECKING:
import pyarrow


class Catalog:
"""DataFusion data catalog."""

def __init__(self, catalog: df_internal.Catalog) -> None:
"""This constructor is not typically called by the end user."""
self.catalog = catalog

def names(self) -> list[str]:
"""Returns the list of databases in this catalog."""
return self.catalog.names()

def database(self, name: str = "public") -> Database:
"""Returns the database with the given ``name`` from this catalog."""
return Database(self.catalog.database(name))


class Database:
"""DataFusion Database."""

def __init__(self, db: df_internal.Database) -> None:
"""This constructor is not typically called by the end user."""
self.db = db

def names(self) -> set[str]:
"""Returns the list of all tables in this database."""
return self.db.names()

def table(self, name: str) -> Table:
"""Return the table with the given ``name`` from this database."""
return Table(self.db.table(name))


class Table:
"""DataFusion table."""

def __init__(self, table: df_internal.Table) -> None:
"""This constructor is not typically called by the end user."""
self.table = table

def schema(self) -> pyarrow.Schema:
"""Returns the schema associated with this table."""
return self.table.schema()

@property
def kind(self) -> str:
"""Returns the kind of table."""
return self.table.kind()
62 changes: 62 additions & 0 deletions py-denormalized/python/denormalized/datafusion/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Common data types used throughout the DataFusion project."""

from denormalized._internal import common as common_internal
from enum import Enum

# TODO these should all have proper wrapper classes

DFSchema = common_internal.DFSchema
DataType = common_internal.DataType
DataTypeMap = common_internal.DataTypeMap
PythonType = common_internal.PythonType
RexType = common_internal.RexType
SqlFunction = common_internal.SqlFunction
SqlSchema = common_internal.SqlSchema
SqlStatistics = common_internal.SqlStatistics
SqlTable = common_internal.SqlTable
SqlType = common_internal.SqlType
SqlView = common_internal.SqlView

__all__ = [
"DFSchema",
"DataType",
"DataTypeMap",
"RexType",
"PythonType",
"SqlType",
"NullTreatment",
"SqlTable",
"SqlSchema",
"SqlView",
"SqlStatistics",
"SqlFunction",
]


class NullTreatment(Enum):
"""Describe how null values are to be treated by functions.
This is used primarily by aggregate and window functions. It can be set on
these functions using the builder approach described in
ref:`_window_functions` and ref:`_aggregation` in the online documentation.
"""

RESPECT_NULLS = common_internal.NullTreatment.RESPECT_NULLS
IGNORE_NULLS = common_internal.NullTreatment.IGNORE_NULLS
Loading

0 comments on commit e9b6e3f

Please sign in to comment.