diff --git a/.github/workflows/build_wheel.yaml b/.github/workflows/build_wheel.yaml
index 62e5a91..d163885 100644
--- a/.github/workflows/build_wheel.yaml
+++ b/.github/workflows/build_wheel.yaml
@@ -18,6 +18,11 @@ jobs:
with:
python-version: 3.11
+ - name: Format code using yapf
+ run: |
+ pip install --upgrade yapf
+
+ yapf airgoodies/ -r -i
- name: Build wheel and install
run: |
pip install --upgrade setuptools
diff --git a/.github/workflows/mdbook.yaml b/.github/workflows/mdbook.yaml
new file mode 100644
index 0000000..ca5f11c
--- /dev/null
+++ b/.github/workflows/mdbook.yaml
@@ -0,0 +1,33 @@
+on:
+ push:
+ branches: [ "develop" ]
+
+jobs:
+ deploy:
+ runs-on: ubuntu-latest
+ permissions:
+ contents: write
+ pages: write
+ id-token: write
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+ - name: Install latest mdbook
+ run: |
+ tag=$(curl 'https://api.github.com/repos/rust-lang/mdbook/releases/latest' | jq -r '.tag_name')
+ url="https://github.com/rust-lang/mdbook/releases/download/${tag}/mdbook-${tag}-x86_64-unknown-linux-gnu.tar.gz"
+ mkdir mdbook
+ curl -sSL $url | tar -xz --directory=./mdbook
+ echo `pwd`/mdbook >> $GITHUB_PATH
+ - name: Build Book
+ run: cd docs && mdbook build
+ - name: Setup Pages
+ uses: actions/configure-pages@v4
+ - name: Upload artifact
+ uses: actions/upload-pages-artifact@v2
+ with:
+ path: 'docs/book'
+ - name: Deploy to GitHub Pages
+ id: deployment
+ uses: actions/deploy-pages@v3
diff --git a/.gitignore b/.gitignore
index d58e0b8..be97ef4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -159,3 +159,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
*.iml
+other/
\ No newline at end of file
diff --git a/README.md b/README.md
index d86e21d..bffe707 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,7 @@
### Airgoodies
-[![.github/workflows/build_wheel.yaml](https://github.com/stav121/apache-airflow-goodies/actions/workflows/build_wheel.yaml/badge.svg?branch=main)](https://github.com/stav121/apache-airflow-goodies/actions/workflows/build_wheel.yaml)
+![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/stav121/apache-airflow-goodies/build_wheel.yaml?branch=develop&style=flat&label=build)
+![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/stav121/apache-airflow-goodies/mdbook.yaml?branch=develop&label=docs)
![PyPI - Version](https://img.shields.io/pypi/v/airgoodies)
![GitHub License](https://img.shields.io/github/license/stav121/apache-airflow-goodies)
![PyPI - Downloads](https://img.shields.io/pypi/dm/goodies)
@@ -13,27 +14,33 @@ Current version matrix:
| Airgoodies Version | Apache Airflow Version | Python Version | Project tag |
|--------------------------------------------------------------------------------------------|------------------------|----------------|---------------------------------------------------------------------------------------------|
+| [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) | 2.7.2 | 3.11 | [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) |
| [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) | 2.7.2 | 3.11 | [v0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) |
| [0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) | 2.7.2 | 3.11 | [v0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) |
| [0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) | 2.7.2 | 3.11 | [v0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) |
-Provided goodies for version [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3):
+Provided goodies for version [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4):
-| Module | Description | Dependency Versions |
-|------------------|-----------------------------------------|----------------------------------------------------------|
-| airgoodies.aws | API for reasy interaction with AWS | pandas==2.1.1
apache-airflow-providers-amazon===8.7.1 |
-| airgoodies.mongo | API for easy interaction with MongoDB | pymongo==4.5.0 |
-| airgoodies.xcom | API for managing variables through XCom | *None* |
+| Module | Description | Dependency Versions |
+|--------------------|-------------------------------------------------|----------------------------------------------------------|
+| airgoodies.command | API for dynamic task configuration through YAML | pyyaml==6.0.1 |
+| airgoodies.aws | API for easy interaction with AWS | pandas==2.1.1
apache-airflow-providers-amazon===8.7.1 |
+| airgoodies.mongo | API for easy interaction with MongoDB | pymongo==4.5.0 |
+| airgoodies.xcom | API for managing variables through XCom | *None* |
-### Usage
+### Installation
+
+Add the following requirement in your `requirements.txt`
```
# requirements.txt
-airgoodies=0.0.3
+airgoodies=0.0.4
```
### Example usage
+For the official documentation, see [here](https://stav121.github.io/apache-airflow-goodies)
+
For an example of how to use this project, see [here](https://github.com/stav121/apache-airflow-goodies-examples)
### Building the project
@@ -44,6 +51,10 @@ To build the project:
$ python3 setup.py sdist bdist_wheel
```
+### License
+
+This project is available under the MIT License.
+
### Author
-Stavros Grigoriou ([stav121](https://github.com/stav121))
\ No newline at end of file
+Stavros Grigoriou ([stav121](https://github.com/stav121))
diff --git a/airgoodies/aws/s3/airgoodies.aws.s3.variables.json b/airgoodies/aws/s3/airgoodies.aws.s3.variables.json
index d13b314..e0bf1bd 100644
--- a/airgoodies/aws/s3/airgoodies.aws.s3.variables.json
+++ b/airgoodies/aws/s3/airgoodies.aws.s3.variables.json
@@ -1,4 +1,4 @@
{
- "airgoodies-aws-s3-connection-name": "",
- "airgoodies-aws-s3-default-bucket": ""
+ "${dag_id}.airgoodies-aws-s3-connection-name": "",
+ "${dag_id}.airgoodies-aws-s3-default-bucket": ""
}
\ No newline at end of file
diff --git a/airgoodies/aws/s3/wrapper.py b/airgoodies/aws/s3/wrapper.py
index 958fb59..3430f55 100644
--- a/airgoodies/aws/s3/wrapper.py
+++ b/airgoodies/aws/s3/wrapper.py
@@ -11,17 +11,23 @@ class S3Wrapper:
Contains utilities such as, load CSV to pandas, load Excel etc.
"""
from logging import Logger, getLogger
+ from airflow.models import TaskInstance
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from pandas import DataFrame
from typing import Callable
from airgoodies.mongo.connection import MongoConnection
+ from airgoodies.util.annotation import provide_dag_id
_logger: Logger = getLogger('airflow.task')
_conn_name: str
_s3_hook: S3Hook
_default_bucket: str = None
- def __init__(self, connection_name: str | None = None) -> None:
+ @provide_dag_id
+ def __init__(self,
+ dag_id: str = None,
+ task_instance: TaskInstance = None,
+ connection_name: str | None = None) -> None:
"""
Initialize the connection to S3 with either the provided connection_name or
the pre-configured from the variable:
@@ -38,19 +44,26 @@ def __init__(self, connection_name: str | None = None) -> None:
from airflow.models import Variable
from airgoodies.common.exception import ConfigNotFoundException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
- from airgoodies.common.variables import AWSVariables
+ from airgoodies.common.variables import AWSVariables, Common
if connection_name is None:
# Load from variable
- self._conn_name = Variable.get(key=AWSVariables.S3_CONNECTION_NAME)
+ self._conn_name = Variable.get(
+ key=AWSVariables.S3_CONNECTION_NAME.replace(
+ Common.DAG_ID_VARIABLE, dag_id))
if self._conn_name is None:
- raise ConfigNotFoundException(AWSVariables.S3_CONNECTION_NAME)
+ raise ConfigNotFoundException(
+ AWSVariables.S3_CONNECTION_NAME.replace(
+ Common.DAG_ID_VARIABLE, dag_id))
else:
# Load from the provided name
self._conn_name = connection_name
- if Variable.get(key=AWSVariables.S3_DEFAULT_BUCKET) is not None:
- self._default_bucket = Variable.get(key=AWSVariables.S3_DEFAULT_BUCKET)
+ if Variable.get(key=AWSVariables.S3_DEFAULT_BUCKET.replace(
+ Common.DAG_ID_VARIABLE, dag_id)) is not None:
+ self._default_bucket = Variable.get(
+ key=AWSVariables.S3_DEFAULT_BUCKET.replace(
+ Common.DAG_ID_VARIABLE, dag_id))
self._s3_hook: S3Hook = S3Hook(aws_conn_id=self._conn_name)
@@ -60,21 +73,32 @@ def get_s3_hook(self) -> S3Hook:
"""
return self._s3_hook
- def load_file(self, key: str, bucket_name: str | None = None) -> str | None:
+ def load_file(self,
+ key: str,
+ bucket_name: str | None = None) -> str | None:
"""
Load the provided key from the provided or default bucket.
:param key: the fully qualified key of the file
:param bucket_name: alternative bucket name otherwise it will use the default
"""
+
if bucket_name is None:
bucket_name = self._default_bucket
- file: str = self._s3_hook.read_key(key=key, bucket_name=bucket_name)
+ if key.endswith(('.xls', '.xlsx')):
+ file: str = self._s3_hook.get_key(
+ key=key, bucket_name=bucket_name).get()["Body"].read()
+ else:
+ file: str = self._s3_hook.read_key(key=key,
+ bucket_name=bucket_name)
return file
- def load_as_dataframe(self, key: str, bucket_name: str | None = None, sep: str = ',') -> DataFrame:
+ def load_as_dataframe(self,
+ key: str,
+ bucket_name: str | None = None,
+ sep: str = ',') -> DataFrame:
"""
Load the provided file from S3 into a pandas DataFrame.
@@ -87,15 +111,15 @@ def load_as_dataframe(self, key: str, bucket_name: str | None = None, sep: str =
from io import StringIO
from airgoodies.common.exception import FileNotFoundException, UnsupportedFileFormatException
- file: StringIO = StringIO(self.load_file(key=key, bucket_name=bucket_name))
+ file: str = self.load_file(key=key, bucket_name=bucket_name)
if file is None:
raise FileNotFoundException(filename=key)
if key.lower().endswith('.csv'):
- return read_csv(filepath_or_buffer=file, sep=sep)
+ return read_csv(filepath_or_buffer=StringIO(file), sep=sep)
elif key.lower().endswith(('.xls', '.xlsx')):
- return read_excel(io=file)
+ return read_excel(io=file, header=None)
else:
raise UnsupportedFileFormatException()
@@ -116,7 +140,9 @@ def load_and_transform(self,
"""
from pandas import DataFrame
- result: DataFrame = self.load_as_dataframe(key=key, bucket_name=bucket_name, sep=sep)
+ result: DataFrame = self.load_as_dataframe(key=key,
+ bucket_name=bucket_name,
+ sep=sep)
if transform_method is None:
return result
@@ -149,10 +175,16 @@ def load_to_mongo(self,
data: DataFrame
if transform_method is None:
- data = self.load_as_dataframe(key=key, bucket_name=bucket_name, sep=sep)
+ data = self.load_as_dataframe(key=key,
+ bucket_name=bucket_name,
+ sep=sep)
else:
- data = self.load_and_transform(key=key, bucket_name=bucket_name, transform_method=transform_method, sep=sep)
+ data = self.load_and_transform(key=key,
+ bucket_name=bucket_name,
+ transform_method=transform_method,
+ sep=sep)
- connection.get_db().get_collection(name=load_table_name).insert_many(loads(data.to_json(orient='records')))
+ connection.get_db().get_collection(name=load_table_name).insert_many(
+ loads(data.to_json(orient='records')))
return load_table_name
diff --git a/airgoodies/command/__init__.py b/airgoodies/command/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/airgoodies/command/airgoodies.command.parser.variable.json b/airgoodies/command/airgoodies.command.parser.variable.json
new file mode 100644
index 0000000..9ff6a60
--- /dev/null
+++ b/airgoodies/command/airgoodies.command.parser.variable.json
@@ -0,0 +1,3 @@
+{
+ "${dag_id}.airgoodies-dag-config-file-key": ""
+}
\ No newline at end of file
diff --git a/airgoodies/command/command.py b/airgoodies/command/command.py
new file mode 100644
index 0000000..bf685de
--- /dev/null
+++ b/airgoodies/command/command.py
@@ -0,0 +1,40 @@
+"""
+@author: Stavros Grigoriou
+@since: 0.0.4
+"""
+
+
+class AirgoodiesCommand:
+ """
+ Airgoodies Command class, contains the metadata and callable for an Airgoodies compatible command.
+ """
+ from airflow.operators.python import PythonOperator
+ from airflow import DAG
+
+ _task_id: str
+ _python_callable: callable
+ _provide_context: bool = True
+
+ def __init__(self,
+ task_id: str,
+ python_callable: callable,
+ provide_context: bool = True):
+ """
+ Command initializer.
+ """
+ self._task_id = task_id
+ self._python_callable = python_callable
+ self._provide_context = provide_context
+
+ def to_operator(self, dag=DAG) -> PythonOperator:
+ """
+ Convert the command in to a PythonOperator usable by Apache Airflow.
+
+ :return: A setup python operator.
+ """
+ from airflow.operators.python import PythonOperator
+
+ return PythonOperator(task_id=self._task_id,
+ python_callable=self._python_callable,
+ provide_context=self._provide_context,
+ dag=dag)
diff --git a/airgoodies/command/parser.py b/airgoodies/command/parser.py
new file mode 100644
index 0000000..e13700c
--- /dev/null
+++ b/airgoodies/command/parser.py
@@ -0,0 +1,105 @@
+"""
+@author: Stavros Grigoriou
+@since: 0.0.4
+"""
+
+
+class CommandParser:
+ """
+ Command Parser for Airgoodies supported YAML format.
+
+ Parses the provided YAML file and creates a list of AirgoodiesCommand with the provided target callable.
+ """
+ from airgoodies.command.command import AirgoodiesCommand
+ from airgoodies.xcom.manager import XComManager
+ from airflow import DAG
+ from airgoodies.util.annotation import provide_dag_id
+
+ _config: dict = None
+ _commands: [AirgoodiesCommand] = []
+ _xcom_manager: XComManager
+
+ @provide_dag_id
+ def __init__(self,
+ dag: DAG,
+ dag_id: str | None = None,
+ yaml_file: str | None = None):
+ """
+ Constructor used to initialize the parser.
+ """
+ import yaml
+ from io import StringIO
+ from airflow.models import Variable
+ from airgoodies.common.variables import CommandParserVariables, Common
+
+ if Variable.get(
+ CommandParserVariables.CONFIG_FILE_KEY.replace(
+ Common.DAG_ID_VARIABLE, dag_id)) is not None:
+ from airgoodies.aws.s3.wrapper import S3Wrapper
+ # Load the configuration file from S3 bucket
+ yaml_file: str = S3Wrapper(dag_id=dag_id).load_file(
+ key=Variable.get(
+ CommandParserVariables.CONFIG_FILE_KEY.replace(
+ Common.DAG_ID_VARIABLE, dag_id)))
+ elif yaml_file is None:
+ raise Exception("No YAML file provided")
+
+ self._config: dict = yaml.safe_load(stream=StringIO(
+ initial_value=yaml_file))
+
+ self._parse()
+
+ def _parse(self) -> None:
+ """
+ Parse the provided command into a list of AirgoodiesCommand.
+ """
+ from airgoodies.command.command import AirgoodiesCommand
+ from airgoodies.common.variables import CommandParserVariables
+
+ for _key in self._config.keys():
+ _airgoodies_callable: dict = self._config[_key]
+
+ if CommandParserVariables.AIRGOODIES_TASK in self._config[_key]:
+ # Aigoodies provided task callable
+ self._commands.append(
+ AirgoodiesCommand(
+ task_id=_key,
+ python_callable=CommandParser._pick_callable(
+ _callable_name=_airgoodies_callable[
+ CommandParserVariables.AIRGOODIES_TASK])))
+ elif CommandParserVariables.CUSTOM_TASK in self._config[_key]:
+ # User provided task module
+ from importlib import import_module
+
+ module_name, method_name = self._config[_key][
+ CommandParserVariables.CUSTOM_TASK].rsplit('.', 1)
+ imported_module = import_module(module_name)
+ python_callable: callable = getattr(imported_module,
+ method_name)
+
+ self._commands.append(
+ AirgoodiesCommand(task_id=_key,
+ python_callable=python_callable))
+
+ def get_commands(self) -> [AirgoodiesCommand]:
+ """
+ Retrieve the list of parsed commands.
+ """
+ return self._commands
+
+ @staticmethod
+ def _pick_callable(_callable_name: str) -> callable:
+ """
+ Static method used to pick the executable suitable for the provided name.
+
+ If no registered airgoodies_command exists, None is returned
+
+ :param _callable_name: the name of the callable to match
+ """
+ from airgoodies.task.aws_tasks import load_from_s3_to_mongo_table
+ from airgoodies.common.variables import CommandParserVariables
+
+ if _callable_name == CommandParserVariables.LOAD_S3_TO_MONGO:
+ return load_from_s3_to_mongo_table
+ else:
+ return None
diff --git a/airgoodies/common/exception.py b/airgoodies/common/exception.py
index bb28871..8ae8f20 100644
--- a/airgoodies/common/exception.py
+++ b/airgoodies/common/exception.py
@@ -13,7 +13,8 @@ class ConfigNotFoundException(Exception):
"""
def __init__(self, variable: str):
- super().__init__(f'Airflow Variable with name <{variable}> was not found')
+ super().__init__(
+ f'Airflow Variable with name <{variable}> was not found')
class ValueNotFoundException(Exception):
diff --git a/airgoodies/common/variables.py b/airgoodies/common/variables.py
index d0dde2d..81e9fe6 100644
--- a/airgoodies/common/variables.py
+++ b/airgoodies/common/variables.py
@@ -7,13 +7,17 @@
"""
+class Common:
+ DAG_ID_VARIABLE: str = '{dag_id}'
+
+
class AWSVariables:
""""
Variables for AWS S3 goodies.
"""
__PACKAGE: str = 'airgoodies.aws'
- S3_CONNECTION_NAME: str = 'airgoodies-aws-s3-connection-name'
- S3_DEFAULT_BUCKET: str = 'airgoodies-aws-s3-default-bucket'
+ S3_CONNECTION_NAME: str = f'{Common.DAG_ID_VARIABLE}.airgoodies-aws-s3-connection-name'
+ S3_DEFAULT_BUCKET: str = f'{Common.DAG_ID_VARIABLE}.airgoodies-aws-s3-default-bucket'
class MongoVariables:
@@ -21,5 +25,26 @@ class MongoVariables:
Variables for Mongo DB goodies.
"""
__PACKAGE: str = 'airgoodies.mongo'
- CONNECTION_URL: str = 'airgoodies-mongo-db-connection-url'
- DEFAULT_DB_NAME: str = 'airgoodies-mongo-db-name'
+ CONNECTION_URL: str = f'{Common.DAG_ID_VARIABLE}.airgoodies-mongo-db-connection-url'
+ DEFAULT_DB_NAME: str = f'{Common.DAG_ID_VARIABLE}.airgoodies-mongo-db-name'
+
+
+class CommandParserVariables:
+ """
+ Variables for YAML CommandParser.
+ """
+ __PACKAGE: str = 'airgoodies.command'
+ AIRGOODIES_TASK: str = 'airgoodies_task'
+ CUSTOM_TASK: str = 'custom_task'
+ CONFIG_FILE_KEY: str = f'{Common.DAG_ID_VARIABLE}.airgoodies-dag-config-file-key'
+
+ # Callables
+ LOAD_S3_TO_MONGO: str = f'load_from_s3_to_mongo_table'
+
+
+class AWSTasksVariables:
+ """
+ Variables for AWS Tasks.
+ """
+ __PACKAGE: str = 'airgoodies.task'
+ INPUT_FILE_VARIABLE: str = f'{Common.DAG_ID_VARIABLE}.airgoodies-task-aws-load-from-s3-to-mongo-input-file-variable'
diff --git a/airgoodies/mongo/airgoodies.mongo.variables.json b/airgoodies/mongo/airgoodies.mongo.variables.json
index 3b41a5c..c217681 100644
--- a/airgoodies/mongo/airgoodies.mongo.variables.json
+++ b/airgoodies/mongo/airgoodies.mongo.variables.json
@@ -1,4 +1,4 @@
{
- "airgoodies-mongo-db-connection-url": "",
- "airgoodies-mongo-db-name": ""
+ "${dag_id}.airgoodies-mongo-db-connection-url": "",
+ "${dag_id}.airgoodies-mongo-db-name": ""
}
\ No newline at end of file
diff --git a/airgoodies/mongo/connection.py b/airgoodies/mongo/connection.py
index 968fee6..bf4fdcf 100644
--- a/airgoodies/mongo/connection.py
+++ b/airgoodies/mongo/connection.py
@@ -6,7 +6,9 @@
class MongoConnection:
from pymongo import MongoClient
+ from airflow.models import TaskInstance
from pymongo.database import Database
+ from airgoodies.util.annotation import provide_dag_id
"""
Mongo connection class, contains the configuration and creates an open connection
with the configured MongoDB.
@@ -17,7 +19,8 @@ class MongoConnection:
_client: MongoClient
_db: Database
- def __init__(self):
+ @provide_dag_id
+ def __init__(self, task_instance: TaskInstance = None, dag_id: str = None):
"""
Constructor, requires the existence of the following Airflow Variables:
* airgoodies-mongo-db-connection-url
@@ -29,19 +32,25 @@ def __init__(self):
from airflow.models import Variable
from airgoodies.common.exception import ConfigNotFoundException
from pymongo import MongoClient
- from airgoodies.common.variables import MongoVariables
+ from airgoodies.common.variables import MongoVariables, Common
logger = logging.getLogger('airflow.task')
logger.info('Retrieving Mongo connection')
- self._conn_url = Variable.get(key=MongoVariables.CONNECTION_URL)
- self._db_name = Variable.get(key=MongoVariables.DEFAULT_DB_NAME)
+ self._conn_url = Variable.get(key=MongoVariables.CONNECTION_URL.
+ replace(Common.DAG_ID_VARIABLE, dag_id))
+ self._db_name = Variable.get(key=MongoVariables.DEFAULT_DB_NAME.
+ replace(Common.DAG_ID_VARIABLE, dag_id))
# Raise exception if none of the above were found.
if not self._conn_url:
- raise ConfigNotFoundException(MongoVariables.CONNECTION_URL)
+ raise ConfigNotFoundException(
+ MongoVariables.CONNECTION_URL.replace(Common.DAG_ID_VARIABLE,
+ dag_id))
if not self._db_name:
- raise ConfigNotFoundException(MongoVariables.DEFAULT_DB_NAME)
+ raise ConfigNotFoundException(
+ MongoVariables.DEFAULT_DB_NAME.replace(Common.DAG_ID_VARIABLE,
+ dag_id))
logger.info('Connecting to MongoDB...')
self._client = MongoClient(host=self._conn_url)
diff --git a/airgoodies/task/__init__.py b/airgoodies/task/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/airgoodies/task/airgoodies.aws.task.variables.json b/airgoodies/task/airgoodies.aws.task.variables.json
new file mode 100644
index 0000000..68eb674
--- /dev/null
+++ b/airgoodies/task/airgoodies.aws.task.variables.json
@@ -0,0 +1,3 @@
+{
+ "${dag_id}.airgoodies-task-aws-load-from-s3-to-mongo-input-file-variable": "input_file"
+}
\ No newline at end of file
diff --git a/airgoodies/task/aws_tasks.py b/airgoodies/task/aws_tasks.py
new file mode 100644
index 0000000..1c39cbf
--- /dev/null
+++ b/airgoodies/task/aws_tasks.py
@@ -0,0 +1,80 @@
+from airflow.models import TaskInstance
+"""
+@author: Stavros Grigoriou
+@since: 0.0.4
+"""
+
+
+def load_from_s3_to_mongo_table(ti: TaskInstance, **kwargs) -> None:
+ """
+ Task that uses the S3Wrapper and MongoConnection to load a file from the configured Bucket
+ directly into a MongoDB datatable.
+
+ :param ti: the TaskInstance of the task
+ :param kwargs: args
+ """
+ from logging import Logger, getLogger
+ from airgoodies.aws.s3.wrapper import S3Wrapper
+ from airgoodies.mongo.connection import MongoConnection
+ from airflow.models import Variable
+ from typing import Callable
+ from airgoodies.common.variables import AWSTasksVariables, Common
+ from airgoodies.task.config.task_configuration import TaskConfig
+ from airgoodies.task.config.variables import AwsS3ToMongoTableOptions, CommonTaskOptions
+ from airgoodies.task.method.transform import resolve
+ from pandas import DataFrame
+
+ logger: Logger = getLogger(name='airflow.task')
+
+ logger.info(f'airgoodies::load_from_s3_to_mongo_table: {ti.task_id}')
+
+ input_file_variable: str = Variable.get(
+ AWSTasksVariables.INPUT_FILE_VARIABLE.replace(Common.DAG_ID_VARIABLE,
+ ti.dag_id))
+
+ logger.info(
+ f'Looking for value of <{input_file_variable}> in provided configuration.'
+ )
+
+ input_file: str = kwargs['dag_run'].conf[input_file_variable]
+
+ # Load the S3Wrapper and the MongoConnection
+ s3_wrapper: S3Wrapper = S3Wrapper(task_instance=ti)
+ task_configuration: TaskConfig = TaskConfig(task_instance=ti,
+ s3_wrapper=s3_wrapper,
+ task_id=ti.task_id)
+
+ transform_method: Callable[[DataFrame], DataFrame] | None = None
+ if task_configuration.get_config(conf=AwsS3ToMongoTableOptions.
+ AIRGOODIES_TRANSFORM_METHOD) is not None:
+ # Load the transform method
+ transform_method: Callable = resolve(
+ name=task_configuration.get_config(
+ conf=AwsS3ToMongoTableOptions.AIRGOODIES_TRANSFORM_METHOD))
+ elif task_configuration.get_config(
+ conf=AwsS3ToMongoTableOptions.CUSTOM_TRANSFORM_METHOD) is not None:
+ from importlib import import_module
+ # Load from locals
+ custom_method: str = task_configuration.get_config(
+ conf=AwsS3ToMongoTableOptions.CUSTOM_TRANSFORM_METHOD)
+ module_name, method_name = custom_method.rsplit('.', 1)
+ logger.info(
+ f'Importing custom transform_method <{method_name}> from <{module_name}>'
+ )
+ imported_module = import_module(module_name)
+ transform_method: Callable[[DataFrame], DataFrame] = getattr(
+ imported_module, method_name)
+
+ mongo_conn: MongoConnection = MongoConnection(dag_id=ti.dag_id)
+
+ out_table_name: str = f'{ti.task_id}_out_table' if task_configuration.get_config(
+ conf=CommonTaskOptions.OUTPUT_TABLE_NAME
+ ) is None else task_configuration.get_config(
+ conf=CommonTaskOptions.OUTPUT_TABLE_NAME)
+
+ # Load the file from S3 into MongoDB
+ s3_wrapper.load_to_mongo(
+ key=input_file,
+ connection=mongo_conn,
+ transform_method=transform_method,
+ load_table_name=out_table_name) # TODO: add to config
diff --git a/airgoodies/task/config/__init__.py b/airgoodies/task/config/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/airgoodies/task/config/task_configuration.py b/airgoodies/task/config/task_configuration.py
new file mode 100644
index 0000000..199dafe
--- /dev/null
+++ b/airgoodies/task/config/task_configuration.py
@@ -0,0 +1,55 @@
+"""
+@author: Stavros Grigoriou
+@since 0.0.4
+"""
+
+
+class TaskConfig:
+ from logging import Logger, getLogger
+ from airflow.models import TaskInstance
+ from airgoodies.util.annotation import provide_dag_id
+ from airgoodies.aws.s3.wrapper import S3Wrapper
+
+ _logger: Logger = getLogger(name='airflow.task')
+ _s3_wrapper: S3Wrapper = None
+ _task_config: dict = None
+ _airgoodies_command: str = None
+
+ @provide_dag_id
+ def __init__(self,
+ task_instance: TaskInstance,
+ s3_wrapper: S3Wrapper,
+ task_id: str,
+ dag_id: str | None = None) -> None:
+ import yaml
+ from airgoodies.aws.s3.wrapper import S3Wrapper
+ from airgoodies.common.variables import CommandParserVariables, Common
+ from airflow.models import Variable
+ from io import StringIO
+
+ self._s3_wrapper: S3Wrapper = s3_wrapper
+
+ # Load the configuration of the task
+ self._task_config: dict = yaml.safe_load(
+ StringIO(
+ self._s3_wrapper.load_file(key=Variable.get(
+ CommandParserVariables.CONFIG_FILE_KEY.replace(
+ Common.DAG_ID_VARIABLE, dag_id)))))[task_id]
+
+ self._airgoodies_command: str = self._task_config[
+ CommandParserVariables.AIRGOODIES_TASK]
+
+ self._logger.info(f'Loaded task config: {self._task_config}')
+ self._logger.info(f'Config: {self._task_config["config"]}')
+
+ def get_config(self, conf: str) -> str | None:
+ """
+ Retrieve the requested configuration from the `config` section.
+ :param conf: the name of the configuration property
+ :return: the value if it exists
+ """
+ if 'config' in self._task_config:
+ if conf in self._task_config['config']:
+ return self._task_config['config'][conf]
+
+ return None
diff --git a/airgoodies/task/config/variables.py b/airgoodies/task/config/variables.py
new file mode 100644
index 0000000..75f3110
--- /dev/null
+++ b/airgoodies/task/config/variables.py
@@ -0,0 +1,21 @@
+"""
+Configuration options for Airgoodies supported commands.
+
+@author: Stavros Grigoriou
+@since: 0.0.4
+"""
+
+
+class CommonTaskOptions:
+ """
+ Configuration options, common across multiple tasks.
+ """
+ OUTPUT_TABLE_NAME: str = 'output_table_name'
+
+
+class AwsS3ToMongoTableOptions:
+ """
+ Configuration options for `load_from_s3_to_mongo_table` command.
+ """
+ AIRGOODIES_TRANSFORM_METHOD: str = 'airgoodies_transform_method'
+ CUSTOM_TRANSFORM_METHOD: str = 'custom_transform_method'
diff --git a/airgoodies/task/method/__init__.py b/airgoodies/task/method/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/airgoodies/task/method/transform.py b/airgoodies/task/method/transform.py
new file mode 100644
index 0000000..2788ac5
--- /dev/null
+++ b/airgoodies/task/method/transform.py
@@ -0,0 +1,36 @@
+from pandas import DataFrame
+from typing import Callable
+"""
+@author: Stavros Grigoriou
+@since: 0.0.4
+"""
+
+
+def _print_table_contents(data: DataFrame) -> DataFrame:
+ """
+ Print the contents of the provided datatable.
+
+ :param data: the input data
+ """
+ from logging import Logger, getLogger
+
+ logger: Logger = getLogger('airflow.task')
+
+ logger.info('Executing `print_table_contents` callable')
+
+ logger.info(data)
+
+ return data
+
+
+def resolve(name: str) -> Callable[[DataFrame], DataFrame]:
+ """
+ Resolve the provided airgoodies transform method callable if it exists.
+ :param name: the name of the airgoodies transform method to resolve
+ :return: the callable of the method
+ """
+ if name == 'print_table_contents':
+ return _print_table_contents
+ else:
+ raise Exception(
+ f'airgoodies_transform_method with name <{name}> not found')
diff --git a/airgoodies/util/__init__.py b/airgoodies/util/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/airgoodies/util/annotation.py b/airgoodies/util/annotation.py
new file mode 100644
index 0000000..f094f5d
--- /dev/null
+++ b/airgoodies/util/annotation.py
@@ -0,0 +1,42 @@
+from typing import Callable
+from inspect import Signature
+from functools import wraps
+from typing import cast
+"""
+@author: Stavros Grigoriou
+@since: 0.0.4
+"""
+
+
+def provide_dag_id(func: Callable) -> Callable:
+ """
+ Provide the dag_id parsed from the or the parameter if it has not been provided during the call of the function.
+
+ :param func: the Callable function.
+ :return: the Callable with the updated dag_id
+ """
+ signature = Signature.from_callable(func,
+ follow_wrapped=True,
+ globals=None,
+ locals=None,
+ eval_str=False)
+
+ @wraps(func)
+ def wrapper(*args, **kwargs) -> Callable:
+ func_args = signature.bind(*args, **kwargs)
+
+ if "dag_id" not in func_args.arguments:
+ if "dag" in func_args.arguments:
+ func_args.arguments["dag_id"] = func_args.arguments[
+ "dag"].dag_id
+ elif "task_instance" in func_args.arguments:
+ func_args.arguments["dag_id"] = func_args.arguments[
+ "task_instance"].dag_id
+ else:
+ raise Exception(
+ 'Could not located or in function parameters'
+ )
+
+ return func(*func_args.args, **func_args.kwargs)
+
+ return cast(Callable, wrapper)
diff --git a/airgoodies/xcom/manager.py b/airgoodies/xcom/manager.py
index 64b2004..45232e6 100644
--- a/airgoodies/xcom/manager.py
+++ b/airgoodies/xcom/manager.py
@@ -33,7 +33,8 @@ def __init__(self, ti: TaskInstance) -> None:
self._ti: TaskInstance = ti
self._task_id: str = ti.task_id
self._run_id: str = ti.run_id
- self._logger.info(msg=f'Initializing XCom manager for task {self._task_id}')
+ self._logger.info(
+ msg=f'Initializing XCom manager for task {self._task_id}')
_xcom_val: str = ti.xcom_pull(key=f'{self._run_id}_variables')
if _xcom_val is None:
self._shared_data: dict = {}
@@ -84,4 +85,5 @@ def _dump_variables(self) -> None:
"""
import json
- self._ti.xcom_push(key=f'{self._run_id}_variables', value=json.dumps(self._shared_data))
+ self._ti.xcom_push(key=f'{self._run_id}_variables',
+ value=json.dumps(self._shared_data))
diff --git a/docs/.gitignore b/docs/.gitignore
new file mode 100644
index 0000000..7585238
--- /dev/null
+++ b/docs/.gitignore
@@ -0,0 +1 @@
+book
diff --git a/docs/book.toml b/docs/book.toml
new file mode 100644
index 0000000..79bdbbe
--- /dev/null
+++ b/docs/book.toml
@@ -0,0 +1,6 @@
+[book]
+title = "airgoodies"
+authors = ["Stavros Grigoriou "]
+language = "en"
+multilingual = false
+src = "src"
diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md
new file mode 100644
index 0000000..d4857d6
--- /dev/null
+++ b/docs/src/SUMMARY.md
@@ -0,0 +1,12 @@
+# Summary
+
+- [Introduction](intro/introduction.md)
+- [Installation](installation/installation.md)
+ - [Install using PyPi](installation/pypi-installation.md)
+ - [Install from sources](installation/sources-installation.md)
+- [Features](features/overview.md)
+ - [Task generation using YAML files](features/yaml.md)
+ - [Predefined tasks](features/tasks/airgoodies.md)
+ - [load_from_s3_to_mongo_table](features/tasks/s3/load_from_s3_to_mongo_table.md)
+- [Airgoodies by example](examples/overview.md)
+ - [Airflow (Docker) + S3 + Mongo](examples/setup/airflow_docker_s3_mongo_setup.md)
\ No newline at end of file
diff --git a/docs/src/examples/overview.md b/docs/src/examples/overview.md
new file mode 100644
index 0000000..64fa9d1
--- /dev/null
+++ b/docs/src/examples/overview.md
@@ -0,0 +1 @@
+### `airgoodies` examples
\ No newline at end of file
diff --git a/docs/src/examples/setup/airflow_docker_s3_mongo_setup.md b/docs/src/examples/setup/airflow_docker_s3_mongo_setup.md
new file mode 100644
index 0000000..59f72e4
--- /dev/null
+++ b/docs/src/examples/setup/airflow_docker_s3_mongo_setup.md
@@ -0,0 +1,297 @@
+### Description
+
+The following example demonstrates how to create a project from scratch, using a dockerized `Airflow` project, an S3
+Bucket and a MongoDB Connection.
+
+In this example we will demonstrate how to use `airgoodies` to quickly set up a workflow that will load a `.csv` file
+from our S3 Bucket, filters its data to extract what we need and afterward save the result into a MongoDB
+table.
+
+You can find the source code for this
+example [here](https://github.com/stav121/apache-airflow-goodies-examples/tree/main/examples/load_csv_from_s3_to_mongo)
+
+### Prerequisites
+
+In order to execute the following example locally, we need to have:
+
+* a valid `Docker` installation
+* a `docker-compose` installation
+* an `Amazon S3` bucket
+
+### Basic setup
+
+The basic setup of the project, begins with the structure:
+
+We create a new project directory:
+
+```bash
+mkdir my_project && cd my_project
+```
+
+In this project we need to create the folder structure as shown bellow:
+
+```text
+my_project/
+ \__ Dockerfile
+ \__ docker-compose.yaml
+ \__ requirements.txt
+ \__ dags/
+ \__ __init__.py
+ \__ my_dag.py
+ \__ custom
+ __init__.py
+ transform.py
+```
+
+To generate this structure we run the following command:
+
+```bash
+touch Dockerfile
+touch docker-compose.yaml
+touch requirements.txt
+mkdir dags/
+cd dags
+touch __init__.py
+touch my_dag.py
+mkdir custom
+cd custom
+touch __init__.py
+touch transform.py
+```
+
+### Create the Dockerfile
+
+Inside the Dockerfile we paste the following:
+
+```dockerfile
+# Custom image for Airflow 2.7.2 and Python version 3.11
+FROM apache/airflow:2.7.2-python3.11
+
+COPY requirements.txt .
+
+# Upgrade PIP
+RUN pip install --upgrade pip
+
+# Install the requirements for local dev
+RUN pip install -r requirements.txt
+```
+
+This will create an `Apache Airflow v2.7.2` with `Python 3.11`, next open our `docker-compose.yaml`
+and paste the content from
+the [example's code](https://github.com/stav121/apache-airflow-goodies-examples/blob/main/examples/load_csv_from_s3_to_mongo/docker-compose.yaml).
+
+This will create a full `Airflow` deployment locally with an associated `MongoDB` for our example.
+
+### Create the `requiements.txt`
+
+Before starting anything on `docker` we need to install our requirements.
+
+For that, we open our `requirements.txt` and insert the following:
+
+```requirements.txt
+# Add the requirement for `airgoodies`
+airgoodies==0.0.4
+```
+
+### Starting the instance
+
+Now, all we have to do is build and start our `airflow` deployment:
+
+```bash
+docker-compose build
+docker-compose up -d
+```
+
+After a while, we can navigate to `http://localhost:8080`, use `username: airflow` and `password: airflow` to view the
+console.
+
+### Setting up the environment
+
+The first step we need to do is to configure the variables used by the `DAG`, and specifically
+used by the `airgoodies` modules that we are going to user.
+
+We navigate to the [Apache Airflow -> Admin -> Variables](http://localhost:8080/variable/list/) page, and
+select the `Choose file` option, and insert the following file (`airgoodies-variables.json`):
+
+```json
+{
+ "csv_loader.airgoodies-mongo-db-connection-url": "mongodb://root:root@mongo:27017",
+ "csv_loader.airgoodies-mongo-db-name": "data",
+ "csv_loader.airgoodies-aws-s3-connection-name": "s3_conn",
+ "csv_loader.airgoodies-aws-s3-default-bucket": "",
+ "csv_loader.airgoodies-dag-config-file-key": "csv_loader_dag_config.yaml",
+ "csv_loader.airgoodies-task-aws-load-from-s3-to-mongo-input-file-variable": "input_file"
+}
+```
+
+(Replace the `` with the name of your S3 Bucket selected for the example).
+
+And afterwards select the `Import Variable` option, to insert the Variables into our Airflow instance.
+
+Next step is to create the S3 connection defined in the `csv_loader.airgoodies-aws-s3-connection-name` variable.
+
+We simply navigate to [Apache Airflow -> Admin -> Connections -> New Connection](http://localhost:8080/connection/add)
+and create
+connection as such:
+
+![](img/s3_conn.png)
+
+And insert the `AWS Access Key ID` and `AWS Secret Access Key` to allow `Airflow` to access S3 resources, and
+select `Save`
+
+### Creating the DAG
+
+In order to create a `DAG` we copy the following code into `dags/csv_loader.py`
+
+```python
+from airflow import DAG
+from airflow.utils.dates import days_ago
+from airgoodies.command.parser import CommandParser
+from airflow.operators.python import PythonOperator
+
+with DAG(
+ dag_id='csv_loader', # used as prefix for all variables in `airgoodies-variables.json`
+ schedule_interval=None,
+ start_date=days_ago(2),
+ default_args={
+ 'owner': 'airflow'
+ },
+ catchup=False,
+ tags=['csv_loader', 'aws', 'mongo', 'airgoodies:examples'],
+) as dag:
+ # Get the logger
+ import logging
+
+ logger: logging.getLogger = logging.getLogger('airflow.task')
+
+ # Initialize the command parser
+ command_parser: CommandParser = CommandParser(dag=dag)
+
+ tasks: [PythonOperator] = []
+
+ # Parse the commands from the `csv_loader_dag_config.yaml` in S3
+ for _task in command_parser.get_commands():
+ tasks = tasks >> _task.to_operator(dag=dag)
+
+ tasks
+```
+
+This snippet of code, basically initializes a DAG and uses the fundamental `airgoodies` utility,
+`CommandParser` which will dynamically create our tasks from the `YAML` find that we will create in the next step.
+
+### Creating the configuration file (`csv_loader_dag_config.yaml`)
+
+Now, as we said earlier, `airgoodies` has the ability to dynamically generate our tasks, by using a YAML file,
+instead of Python code.
+
+In our case, we want to load a file from S3, parse it and save it to our MongoDB. For this, `airgoodies` already
+provides a pre-defined task
+that can handle all the boilerplate functionality,
+called [load-from-s3-to-mongo-table](../../features/tasks/s3/load_from_s3_to_mongo_table.md).
+
+Based on the above documentation, we need to crete a new file, let's name it `csv_loader_dag_config.yaml` as we
+specified in our
+task Variable `csv_loader.airgoodies-dag-config-file-key`, and insert the configuration:
+
+```yaml
+load_from_s3_and_save: # This will appear as our task_id in airflow
+ airgoodies_task: load_from_s3_to_mongo_table # The airgoodies pre-defined task name
+ config: # The configuration of the task
+ custom_transform_method: custom.transform.keep_athens # Use our custom transform method
+ output_table_name: my_output_table # The name of the collection to save the result into
+```
+
+Save and upload in our `S3 Bucket`.
+
+### Creating the input data (`population.csv`)
+
+Next step, is to create the data of the demo, simply create a new file `population.csv` and insert the following data:
+
+```csv
+city,population,year
+Athens,3153000,2021
+Athens,3154000,2022
+Patras,215922,2021
+Patras,213984,2011
+```
+
+Save, and upload on the S3 bucket as well.
+
+### Executing and troubleshooting
+
+If everything is set up correctly up until now, we should be able to see a new `DAG` inside our `Airflow` home page:
+
+![](img/dag.png)
+
+On the far right corner we select to `Trigger DAG /w config`:
+
+![](img/trigger.png)
+
+And insert the following configuration and select `Trigger`:
+
+```json
+{
+ "input_file": "population.png"
+}
+```
+
+After waiting a bit, we will notice that our is failing with the following message:
+
+```text
+Traceback (most recent call last):
+ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
+ return_value = self.execute_callable()
+ ^^^^^^^^^^^^^^^^^^^^^^^
+ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 209, in execute_callable
+ return self.python_callable(*self.op_args, **self.op_kwargs)
+ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+ File "/home/airflow/.local/lib/python3.11/site-packages/airgoodies/task/aws_tasks.py", line 65, in load_from_s3_to_mongo_table
+ transform_method: Callable[[DataFrame], DataFrame] = getattr(
+ ^^^^^^^^
+AttributeError: module 'custom.transform' has no attribute 'keep_athens'
+```
+
+The above message is expected because we have defined a method `custom.transform.keep_athens` as
+our `custom_transform_method` but we have not created it yet.
+
+To remove this message, we need to edit our `dags/custom/transform.py` and add the following code:
+
+```python
+import pandas as pd
+import logging
+
+
+def keep_athens(data: pd.DataFrame) -> pd.DataFrame:
+ """
+ Function that loads the pandas DataFrame and removes the lines that are not
+ related to Athens.
+ """
+ logger: logging.Logger = logging.Logger('airflow.task')
+ logger.info('Running custom transform method: keep_athens')
+
+ # Remove each line not related to 'Athens'
+ data.drop(data[data['city'] != 'Athens'].index, inplace=True)
+
+ logger.info(f'New data:\n{data}')
+ return data # Return the DataFrame to write it in MongoDB
+```
+
+Save, wait a bit and re-execute the example. Now everythign should execute correctly:
+
+![](img/success.png)
+
+To verify the resource, we need to log in to our MongoDB (`mongodb://root:root@mongo:27017`) and verify the saved result
+of our table `my_output_table` in `data` schema.
+
+The result should look like this:
+
+![](img/result.png)
+
+### Conclusion
+
+This example demonstrates just a short example of the abilities of `airgoodies`, and it was created using
+the `airgoodies` version [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3)
+
+### Author
+
+Stavros Grigoriou ([stav121](https://github.com/stav121))
\ No newline at end of file
diff --git a/docs/src/examples/setup/img/dag.png b/docs/src/examples/setup/img/dag.png
new file mode 100644
index 0000000..60f3296
Binary files /dev/null and b/docs/src/examples/setup/img/dag.png differ
diff --git a/docs/src/examples/setup/img/result.png b/docs/src/examples/setup/img/result.png
new file mode 100644
index 0000000..a8b3c59
Binary files /dev/null and b/docs/src/examples/setup/img/result.png differ
diff --git a/docs/src/examples/setup/img/s3_conn.png b/docs/src/examples/setup/img/s3_conn.png
new file mode 100644
index 0000000..2e92b23
Binary files /dev/null and b/docs/src/examples/setup/img/s3_conn.png differ
diff --git a/docs/src/examples/setup/img/success.png b/docs/src/examples/setup/img/success.png
new file mode 100644
index 0000000..3fec4ce
Binary files /dev/null and b/docs/src/examples/setup/img/success.png differ
diff --git a/docs/src/examples/setup/img/trigger.png b/docs/src/examples/setup/img/trigger.png
new file mode 100644
index 0000000..2232dba
Binary files /dev/null and b/docs/src/examples/setup/img/trigger.png differ
diff --git a/docs/src/features/overview.md b/docs/src/features/overview.md
new file mode 100644
index 0000000..f3fcbe6
--- /dev/null
+++ b/docs/src/features/overview.md
@@ -0,0 +1 @@
+### Features overview
\ No newline at end of file
diff --git a/docs/src/features/tasks/airgoodies.md b/docs/src/features/tasks/airgoodies.md
new file mode 100644
index 0000000..ee5ecee
--- /dev/null
+++ b/docs/src/features/tasks/airgoodies.md
@@ -0,0 +1,6 @@
+### Pre-defined `airgoodies` tasks
+
+Airgoodies supports various operations out of the box, requiring minimum configuration.
+
+* [load_from_s3_to_mongo_table](s3/load_from_s3_to_mongo_table.md): Offers the ability to load a file from an S3 bucket
+ directly into a MongoDB table, giving the ability to edit the data before saving.
\ No newline at end of file
diff --git a/docs/src/features/tasks/s3/load_from_s3_to_mongo_table.md b/docs/src/features/tasks/s3/load_from_s3_to_mongo_table.md
new file mode 100644
index 0000000..d3968d6
--- /dev/null
+++ b/docs/src/features/tasks/s3/load_from_s3_to_mongo_table.md
@@ -0,0 +1,86 @@
+### `load_from_s3_to_mongo_table`
+
+#### Overview
+
+The `load_from_s3_to_mongo_table` task, enables the ability to load a file (currently supported `.csv`, `.xls`, `.xlsx`)
+directly from an S3 bucket into a MongoDB table, offering the ability to perform actions in between.
+
+#### Transform method options
+
+| option | values |
+|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------|
+| airgoodies_transform_method | print_table_contents |
+| custom_transform_method | `path.to.method`: the method must have the signature `Callable[[pandas.DataFrame], pandas.DataFrame]` (View examples below) |
+| output_table_name | the name of the MongoDB collection to save the result into, default is `{dag_id}_output_table` |
+
+#### Example YAML syntax
+
+```yaml
+example_task:
+ airgoodies_task: load_from_s3_to_mongo_table
+ config:
+ airgoodies_transform_method: print_table_contents
+ output_table_name: my_output_table
+```
+
+The above YAML defines a task that will load a file from the configured bucket, print it's contents
+and export it into a MongoDB table.
+
+### Example with custom transform method
+
+Let's say we want to load a file from an S3 Bucket, modify the contents and then save the output to MongoDB.
+
+For this case consider the following file `population.csv`
+
+```csv
+city,population;year
+Athens,3153000;2021
+Athens,3154000;2022
+Patras,215922;2021
+Patras,213984;2011
+```
+
+And instead of loading all the cities, we need to load only the ones for `Athens`.
+
+In this case we need to create our own custom method and link it.
+
+Considering the following structure:
+
+```
+root/
+ \_ dags/
+ \_ my_dag.py
+ \_ custom/
+ \_ transform.py
+```
+
+We create a new function in `transform.py`:
+
+```python
+import pandas as pd
+import logging
+
+
+def keep_athens(data: pd.DataFrame) -> pd.DataFrame:
+ """
+ Function that loads the pandas DataFrame and removes the lines that are not
+ related to Athens.
+ """
+ logger: logging.Logger = logging.Logger('airflow.task')
+ logger.info('Running custom transform method: keep_athens')
+
+ data.drop(data[data['city'] != 'Athens'].index, inplace=True)
+
+ logger.info(f'New data:\n{data}')
+ return data # Return the DataFrame to write it in MongoDB
+```
+
+Now, we need to create a YAML file that creates our task in `Airflow`.
+The file should look like this:
+
+```yaml
+my_load_data_task:
+ airgoodies_task: load_from_s3_to_mongo_table
+ config:
+ custom_transform_method: custom.transform.keep_athens # Path to our function
+```
\ No newline at end of file
diff --git a/docs/src/features/yaml.md b/docs/src/features/yaml.md
new file mode 100644
index 0000000..aafd1ec
--- /dev/null
+++ b/docs/src/features/yaml.md
@@ -0,0 +1,24 @@
+### Task generation using YAML Files
+
+`airgoodies` enables the ability to dynamically create tasks using YAML files with a specific syntax.
+
+Currently the supported syntax offers the following commands:
+
+```yaml
+$task_name: # This is the name that will appear for the task (task_id)
+ $task: # The task to execute
+ config: # The configuration of the task
+ $options: $values
+ airgoodies_transform_method | custom_transform_method: $method_name
+```
+
+#### Options for `$task`:
+
+| option | explanation |
+|-----------------|--------------------------------------------------------------------------|
+| airgoodies_task | use a predefined `airgoodies` task from [this list](tasks/airgoodies.md) |
+| custom_task | run a custom python task, by providing the path of the callable |
+
+### Options for `config`:
+
+Config enables the configuration of the task, and the options are defined by the `$task` option you have chosen.
\ No newline at end of file
diff --git a/docs/src/installation/installation.md b/docs/src/installation/installation.md
new file mode 100644
index 0000000..6a05290
--- /dev/null
+++ b/docs/src/installation/installation.md
@@ -0,0 +1,20 @@
+## Installation
+
+`airgoodies` is available for specific `Apache Airflow` and `Python` versions:
+
+Please view the matrix below to choose which one best suites you:
+
+| Airgoodies Version | Apache Airflow Version | Python Version | Project tag |
+|--------------------------------------------------------------------------------------------|------------------------|----------------|---------------------------------------------------------------------------------------------|
+| [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) | 2.7.2 | 3.11 | [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) |
+| [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) | 2.7.2 | 3.11 | [v0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) |
+| [0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) | 2.7.2 | 3.11 | [v0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) |
+| [0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) | 2.7.2 | 3.11 | [v0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) |
+
+Keep in mind that those are only the versions tested and that it is known to be working, but is should not be limited by
+it, so a version might not be documented here, but could work for you.
+
+`airgoodies` can be installed in many different ways:
+
+* Install using `PyPi` directly into your `Airflow` deployment: [Install using Pypi](./pypi-installation.md)
+* Install directly from sources: [Install from sources](./sources-installation)
\ No newline at end of file
diff --git a/docs/src/installation/pypi-installation.md b/docs/src/installation/pypi-installation.md
new file mode 100644
index 0000000..b6f43a2
--- /dev/null
+++ b/docs/src/installation/pypi-installation.md
@@ -0,0 +1,11 @@
+### Install using PyPi
+
+`airgoodies` is available directly from PyPi and can be installed on your `Apache Airflow` by updating your
+`requirements.txt`:
+
+```text
+# requirements.txt
+airgoodies==0.0.4
+```
+
+For all the available versions please check [here](https://pypi.org/project/airgoodies/)
\ No newline at end of file
diff --git a/docs/src/installation/sources-installation.md b/docs/src/installation/sources-installation.md
new file mode 100644
index 0000000..2628420
--- /dev/null
+++ b/docs/src/installation/sources-installation.md
@@ -0,0 +1,3 @@
+### Install from Sources
+
+# TODO
\ No newline at end of file
diff --git a/docs/src/intro/introduction.md b/docs/src/intro/introduction.md
new file mode 100644
index 0000000..663b7c8
--- /dev/null
+++ b/docs/src/intro/introduction.md
@@ -0,0 +1,6 @@
+# Introduction
+
+`airgoodies` is a utility library for Apache Airflow, that allows easy development for Apache Airflow
+by removing a lot of boiler plate code form the end user.
+
+Using `airgoodies` helps in keeping your `dags` smaller and more maintainable.
\ No newline at end of file
diff --git a/docs/src/introduction.md b/docs/src/introduction.md
new file mode 100644
index 0000000..e10b99d
--- /dev/null
+++ b/docs/src/introduction.md
@@ -0,0 +1 @@
+# Introduction
diff --git a/release.sh b/release.sh
index 3f0a2ba..f69f65b 100755
--- a/release.sh
+++ b/release.sh
@@ -1,4 +1,4 @@
-#/bin/bash
+#!/bin/bash
# Script that builds the wheel of the project and creates a release
#
diff --git a/requirements.txt b/requirements.txt
index 8a6a907..9d4109f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,5 @@
pymongo==4.5.0
apache-airflow==2.7.2
apache-airflow-providers-amazon==8.7.1
-pandas==2.1.1
\ No newline at end of file
+pandas==2.1.1
+pyyaml==6.0.1
\ No newline at end of file
diff --git a/setup.py b/setup.py
index 335421b..4bfdc39 100644
--- a/setup.py
+++ b/setup.py
@@ -8,7 +8,7 @@ def get_readme():
setup(
name='airgoodies',
- version='0.0.3',
+ version='0.0.4',
description='Various goodies for Apache Airflow',
long_description=get_readme(),
long_description_content_type='text/markdown',
@@ -19,12 +19,18 @@ def get_readme():
'airgoodies.mongo',
'airgoodies.xcom',
'airgoodies.common',
- 'airgoodies.aws.s3'
+ 'airgoodies.aws.s3',
+ 'airgoodies.task',
+ 'airgoodies.task.config',
+ 'airgoodies.task.method',
+ 'airgoodies.command',
+ 'airgoodies.util'
],
install_requires=[
'pymongo==4.5.0',
'apache-airflow==2.7.2',
'apache-airflow-providers-amazon==8.7.1',
- 'pandas==2.1.1'
+ 'pandas==2.1.1',
+ 'pyyaml==6.0.1'
]
)
diff --git a/tests/test_imports.py b/tests/test_imports.py
deleted file mode 100644
index b2249c1..0000000
--- a/tests/test_imports.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from unittest import TestCase, main
-
-"""
-@author: Stavros Grigoriou
-@since: 0.0.1
-"""
-
-
-class ImportTest(TestCase):
-
- def test_imports_should_succeed(self):
- self.assertEqual(True, True)
-
-
-if __name__ == "__main__":
- main()