Astro Python SDK allows for rapid and clean development of extract, transform, and load (ETL) workflows using Python.
The SDK abstracts the boilerplate code required for communication between datasets and tasks, which helps DAG authors to achieve more with less code.
It is powered by Apache Airflow and maintained by Astronomer.
⚠️ Disclaimer This project is in a preview release state. In other words, it is not production-ready yet. The interfaces may change. We welcome users to try out the interfaces and provide us with feedback.
- Apache Airflow >= 2.1.0.
The Astro Python SDK is available at PyPI. Use the standard Python installation tools.
To install a cloud-agnostic version of the SDK, run:
pip install astro-sdk-python
You can also install dependencies for using the SDK with popular cloud providers:
pip install astro-sdk-python[amazon,google,snowflake,postgres]
-
Copy the following DAG into a file named
calculate_popular_movies.py
and add it to thedags
directory of your Airflow project:from datetime import datetime from airflow import DAG from astro import sql as aql from astro.files import File from astro.sql.table import Table @aql.transform() def top_five_animations(input_table: Table): return """ SELECT Title, Rating FROM {{input_table}} WHERE Genre1=='Animation' ORDER BY Rating desc LIMIT 5; """ with DAG( "calculate_popular_movies", schedule_interval=None, start_date=datetime(2000, 1, 1), catchup=False, ) as dag: imdb_movies = aql.load_file( File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb.csv"), output_table=Table( name="imdb_movies", conn_id="sqlite_default" ), ) top_five_animations( input_table=imdb_movies, output_table=Table( name="top_animation" ), )
-
Ensure that your Airflow environment is set up correctly by running the following commands:
export AIRFLOW_HOME=`pwd` export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True airflow db init
-
Create a SQLite database for the example to run with and run the DAG:
# The sqlite_default connection has different host for MAC vs. Linux export SQL_TABLE_NAME=`airflow connections get sqlite_default -o yaml | grep host | awk '{print $2}'` sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
-
Run the example DAG:
airflow dags test calculate_popular_movies `date -Iseconds`
-
Check the result of your DAG by running:
sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
You should see the following output:
$ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit" Toy Story 3 (2010)|8.3 Inside Out (2015)|8.2 How to Train Your Dragon (2010)|8.1 Zootopia (2016)|8.1 How to Train Your Dragon 2 (2014)|7.9
Databases |
---|
Google BigQuery |
Postgres |
Snowflake |
SQLite |
File types |
---|
CSV |
JSON |
NDJSON |
Parquet |
File stores |
---|
Amazon S3 |
Filesystem |
Google GCS |
The following are some key functions available in the SDK:
load_file
: load a given file into a SQL tabletransform
: applies a SQL select statement to a source table and saves the result to a destination tabledrop_table
: Drops a SQL tablerun_raw_sql
: run any SQL statement without handling its outputappend
: insert rows from the source SQL table into the destination SQL table, if there are no conflictsmerge
: insert rows from the source SQL table into the destination SQL table, depending on conflicts:- ignore: do not add rows that already exist
- update: replace existing rows with new ones
export_file
: export SQL table rows into a destination filedataframe
: export given SQL table into in-memory Pandas data-frame
For a full list of available operators, see the SDK reference documentation.
The documentation is a work in progress--we aim to follow the Diátaxis system:
- Getting Started: A hands-on introduction to the Astro Python SDK
- How-to guides: Simple step-by-step user guides to accomplish specific tasks
- Reference guide: Commands, modules, classes and methods
- Explanation: Clarification and discussion of key decisions when designing the project
The Astro Python SDK follows semantic versioning for releases. Check the changelog for the latest changes.
To learn more about our release philosophy and steps, see Managing Releases.
All contributions, bug reports, bug fixes, documentation improvements, enhancements, and ideas are welcome.
Read the Contribution Guideline for a detailed overview on how to contribute.
Contributors and maintainers should abide by the Contributor Code of Conduct.