Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLFrame: Add basic and all-types examples #765

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ updates:
schedule:
interval: "daily"

- directory: "/by-dataframe/sqlframe"
package-ecosystem: "pip"
schedule:
interval: "daily"

- directory: "/by-language/csharp-npgsql"
package-ecosystem: "nuget"
schedule:
Expand Down
74 changes: 74 additions & 0 deletions .github/workflows/dataframe-sqlframe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: SQLFrame

on:
pull_request:
branches: ~
paths:
- 'dataframe-sqlframe.yml'
- 'by-dataframe/sqlframe/**'
- '/requirements.txt'
push:
branches: [ main ]
paths:
- 'dataframe-sqlframe.yml'
- 'by-dataframe/sqlframe/**'
- '/requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.9', '3.13' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
by-dataframe/sqlframe/requirements.txt
by-dataframe/sqlframe/requirements-test.txt

- name: Install utilities
run: |
pip install -r requirements.txt

- name: Validate by-dataframe/sqlframe
run: |
ngr test --accept-no-venv by-dataframe/sqlframe
1 change: 1 addition & 0 deletions by-dataframe/sqlframe/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
77 changes: 77 additions & 0 deletions by-dataframe/sqlframe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Verify the `sqlframe` library with CrateDB

Turning PySpark Into a Universal DataFrame API

## About

This folder includes software integration tests for verifying
that the [SQLFrame] Python library works well together with [CrateDB].

SQLFrame implements the [PySpark] DataFrame API in order to enable running
transformation pipelines directly on database engines - no Spark clusters
or dependencies required.

## What's Inside

- `example_basic.py`: A few examples that read CrateDB's `sys.summits` table.
An example inquiring existing tables.

- `example_types.py`: An example that exercises all data types supported by
CrateDB.

## Synopsis

```shell
pip install --upgrade sqlframe
```
```python
from psycopg2 import connect
from sqlframe import activate
from sqlframe.base.functions import col

# Define database connection parameters, suitable for CrateDB on localhost.
# For CrateDB Cloud, use `crate://<username>:<password>@<host>`.
conn = connect(
dbname="crate",
user="crate",
password="",
host="localhost",
port="5432",
)
# Activate SQLFrame to run directly on CrateDB.
activate("postgres", conn=conn)

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Invoke query.
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
)
print(df.sql())
df.show()
```

## Tests

Set up sandbox and install packages.
```bash
pip install uv
uv venv .venv
source .venv/bin/activate
uv pip install -r requirements.txt -r requirements-test.txt
```

Run integration tests.
```bash
pytest
```


[CrateDB]: https://cratedb.com/database
[PySpark]: https://spark.apache.org/docs/latest/api/python/
[SQLFrame]: https://pypi.org/project/sqlframe/
98 changes: 98 additions & 0 deletions by-dataframe/sqlframe/example_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Using `sqlframe` with CrateDB: Basic usage.

pip install --upgrade sqlframe

A few basic operations using the `sqlframe` library with CrateDB.

- https://pypi.org/project/sqlframe/
"""

from psycopg2 import connect
from sqlframe import activate
from sqlframe.base.functions import col

from patch import monkeypatch


def connect_spark():
# Connect to database.
conn = connect(
dbname="crate",
user="crate",
password="",
host="localhost",
port="5432",
)
# Activate SQLFrame to run directly on CrateDB.
activate("postgres", conn=conn)

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
return spark


def sqlframe_select_sys_summits():
"""
Query CrateDB's built-in `sys.summits` table.
:return:
"""
spark = connect_spark()
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
)
print(df.sql())
df.show()
return df


def sqlframe_export_sys_summits_pandas():
"""
Query CrateDB's built-in `sys.summits` table, returning a pandas dataframe.
"""
spark = connect_spark()
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
).toPandas()
return df


def sqlframe_export_sys_summits_csv():
"""
Query CrateDB's built-in `sys.summits` table, saving the output to CSV.
"""
spark = connect_spark()
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
)
df.write.csv("summits.csv", mode="overwrite")
return df


def sqlframe_get_table_names():
"""
Inquire table names of the system schema `sys`.
"""
spark = connect_spark()
tables = spark.catalog.listTables(dbName="sys")
return tables


monkeypatch()


if __name__ == "__main__":
print(sqlframe_select_sys_summits())
print(sqlframe_export_sys_summits_pandas())
print(sqlframe_export_sys_summits_csv())
print(sqlframe_get_table_names())
Loading
Loading