Skip to content

Commit

Permalink
feat(tasks): adds 'drop_columns' transform method
Browse files Browse the repository at this point in the history
* adds config options inside tasks
  • Loading branch information
stav121 committed Feb 7, 2024
1 parent 78df0c4 commit b76db73
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 26 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/mdbook.yaml
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
on:
push:
branches: [ "main" ]
branches: [ "develop" ]

jobs:

deploy:
runs-on: ubuntu-latest
permissions:
contents: write
pages: write
id-token: write
steps:

- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Install latest mdbook
run: |
tag=$(curl 'https://api.github.com/repos/rust-lang/mdbook/releases/latest' | jq -r '.tag_name')
url="https://github.com/rust-lang/mdbook/releases/download/${tag}/mdbook-${tag}-x86_64-unknown-linux-gnu.tar.gz"
mkdir mdbook
curl -sSL $url | tar -xz --directory=./mdbook
echo `pwd`/mdbook >> $GITHUB_PATH
- name: Build Book
run: cd docs && mdbook build

- name: Setup Pages
uses: actions/configure-pages@v4

- name: Upload artifact
uses: actions/upload-pages-artifact@v2
with:
path: 'docs/book'

- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v3
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ Current version matrix:

| Airgoodies Version | Apache Airflow Version | Python Version | Project tag |
|--------------------------------------------------------------------------------------------|------------------------|----------------|---------------------------------------------------------------------------------------------|
| [0.0.5](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.5) | 2.7.2 | 3.11 | [v0.0.5](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.5) |
| [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) | 2.7.2 | 3.11 | [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) |
| [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) | 2.7.2 | 3.11 | [v0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) |
| [0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) | 2.7.2 | 3.11 | [v0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) |
| [0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) | 2.7.2 | 3.11 | [v0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) |

Provided goodies for version [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4):
Provided goodies for version [0.0.5](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.5):

| Module | Description | Dependency Versions |
|--------------------|-------------------------------------------------|----------------------------------------------------------|
Expand All @@ -34,7 +35,7 @@ Add the following requirement in your `requirements.txt`

```
# requirements.txt
airgoodies=0.0.4
airgoodies=0.0.5
```

### Example usage
Expand Down
20 changes: 12 additions & 8 deletions airgoodies/aws/s3/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ def load_as_dataframe(self,
def load_and_transform(self,
key: str,
bucket_name: str | None = None,
transform_method: Callable[[DataFrame], DataFrame] | None = None,
transform_method: Callable[[DataFrame, dict], DataFrame] = None,
transform_method_config: dict = None,
sep: str | None = None) \
-> DataFrame:
"""
Expand All @@ -147,14 +148,15 @@ def load_and_transform(self,
if transform_method is None:
return result
else:
return transform_method(result)
return transform_method(result, transform_method_config)

def load_to_mongo(self,
key: str,
connection: MongoConnection,
load_table_name: str,
bucket_name: str | None = None,
transform_method: Callable[[DataFrame], DataFrame] | None = None,
bucket_name: str = None,
transform_method: Callable[[DataFrame, dict], DataFrame] = None,
transform_method_config: dict = None,
sep: str | None = None) \
-> str:
"""
Expand All @@ -179,10 +181,12 @@ def load_to_mongo(self,
bucket_name=bucket_name,
sep=sep)
else:
data = self.load_and_transform(key=key,
bucket_name=bucket_name,
transform_method=transform_method,
sep=sep)
data = self.load_and_transform(
key=key,
bucket_name=bucket_name,
transform_method=transform_method,
transform_method_config=transform_method_config,
sep=sep)

connection.get_db().get_collection(name=load_table_name).insert_many(
loads(data.to_json(orient='records')))
Expand Down
2 changes: 1 addition & 1 deletion airgoodies/command/command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
@author: Stavros Grigoriou
@author: Stavros Grigoriou <[email protected]>
@since: 0.0.4
"""

Expand Down
5 changes: 3 additions & 2 deletions airgoodies/task/aws_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def load_from_s3_to_mongo_table(ti: TaskInstance, **kwargs) -> None:
s3_wrapper=s3_wrapper,
task_id=ti.task_id)

transform_method: Callable[[DataFrame], DataFrame] | None = None
transform_method: Callable[[DataFrame, dict], DataFrame] | None = None
if task_configuration.get_config(conf=AwsS3ToMongoTableOptions.
AIRGOODIES_TRANSFORM_METHOD) is not None:
# Load the transform method
Expand Down Expand Up @@ -77,4 +77,5 @@ def load_from_s3_to_mongo_table(ti: TaskInstance, **kwargs) -> None:
key=input_file,
connection=mongo_conn,
transform_method=transform_method,
load_table_name=out_table_name) # TODO: add to config
transform_method_config=task_configuration.get_config_dict(),
load_table_name=out_table_name)
11 changes: 11 additions & 0 deletions airgoodies/task/config/task_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ def __init__(self,
self._logger.info(f'Loaded task config: {self._task_config}')
self._logger.info(f'Config: {self._task_config["config"]}')

def get_config_dict(self) -> dict | None:
"""
Retrieve the entire task configuration.
:return: the configuration dictionary or None
"""

if 'config' in self._task_config:
return self._task_config['config']
return None

def get_config(self, conf: str) -> str | None:
"""
Retrieve the requested configuration from the `config` section.
Expand Down
28 changes: 25 additions & 3 deletions airgoodies/task/method/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
"""


def _print_table_contents(data: DataFrame) -> DataFrame:
def _print_table_contents(data: DataFrame, config: dict = None) -> DataFrame:
"""
Print the contents of the provided datatable.
:param data: the input data
"""
from logging import Logger, getLogger

logger: Logger = getLogger('airflow.task')
logger: Logger = getLogger(name='airflow.task')

logger.info('Executing `print_table_contents` callable')

Expand All @@ -23,14 +23,36 @@ def _print_table_contents(data: DataFrame) -> DataFrame:
return data


def resolve(name: str) -> Callable[[DataFrame], DataFrame]:
def _drop_columns(data: DataFrame, config: dict) -> DataFrame:
"""
Drop the selected column(s) from the imported data.
:param data: the input data
:param config: the columns to drop
"""
from logging import Logger, getLogger

logger: Logger = getLogger(name='airflow.task')

if 'drop_columns_val' in config:
columns: [str] = config['drop_columns_val']
logger.info(
f'Executing `drop_column` callable for columns <{columns}>')
data = data.drop(columns=columns)

return data


def resolve(name: str) -> Callable[[DataFrame, dict], DataFrame]:
"""
Resolve the provided airgoodies transform method callable if it exists.
:param name: the name of the airgoodies transform method to resolve
:return: the callable of the method
"""
if name == 'print_table_contents':
return _print_table_contents
elif name == 'drop_columns':
return _drop_columns
else:
raise Exception(
f'airgoodies_transform_method with name <{name}> not found')
2 changes: 2 additions & 0 deletions docs/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@
- [Task generation using YAML files](features/yaml.md)
- [Predefined tasks](features/tasks/airgoodies.md)
- [load_from_s3_to_mongo_table](features/tasks/s3/load_from_s3_to_mongo_table.md)
- [print_table_columns](features/tasks/s3/transform/print_table_columns.md)
- [drop_columns](features/tasks/s3/transform/drop_columns.md)
- [Airgoodies by example](examples/overview.md)
- [Airflow (Docker) + S3 + Mongo](examples/setup/airflow_docker_s3_mongo_setup.md)
4 changes: 2 additions & 2 deletions docs/src/examples/setup/airflow_docker_s3_mongo_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ For that, we open our `requirements.txt` and insert the following:

```requirements.txt
# Add the requirement for `airgoodies`
airgoodies==0.0.4
airgoodies==0.0.5
```

### Starting the instance
Expand Down Expand Up @@ -290,7 +290,7 @@ The result should look like this:
### Conclusion

This example demonstrates just a short example of the abilities of `airgoodies`, and it was created using
the `airgoodies` version [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3)
the `airgoodies` version [v0.0.5](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.5)

### Author

Expand Down
10 changes: 5 additions & 5 deletions docs/src/features/tasks/s3/load_from_s3_to_mongo_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ directly from an S3 bucket into a MongoDB table, offering the ability to perform

#### Transform method options

| option | values |
|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| airgoodies_transform_method | print_table_contents |
| custom_transform_method | `path.to.method`: the method must have the signature `Callable[[pandas.DataFrame], pandas.DataFrame]` (View examples below) |
| output_table_name | the name of the MongoDB collection to save the result into, default is `{dag_id}_output_table` |
| option | values |
|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
| airgoodies_transform_method | [print_table_contents](transform/print_table_contents.md)<br/>[drop_columns](transform/drop_columns.md) |
| custom_transform_method | `path.to.method`: the method must have the signature `Callable[[pandas.DataFrame, dict], pandas.DataFrame]` (View examples below) |
| output_table_name | the name of the MongoDB collection to save the result into, default is `{dag_id}_output_table` |

#### Example YAML syntax

Expand Down
19 changes: 19 additions & 0 deletions docs/src/features/tasks/s3/transform/drop_columns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# drop_columns

This transform method enables the option to drop some columns from the end table.

# Configuration options

| config | description | type | example |
|-----------------|-------------------------|-------|----------------------|
| drop_column_val | List of columns to drop | [str] | [column_1, column_2] |

# Example

```yaml
my_task:
airgoodies_task: load_from_s3_to_mongo_table
config:
airgoodies_transform_method: drop_columns
drop_columns_val: [ column_1, column_2 ]
```
3 changes: 3 additions & 0 deletions docs/src/features/tasks/s3/transform/print_table_columns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# print_table_columns

This option will drop the content of the DataFrame as it will be saved in the database.
1 change: 1 addition & 0 deletions docs/src/installation/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Please view the matrix below to choose which one best suites you:

| Airgoodies Version | Apache Airflow Version | Python Version | Project tag |
|--------------------------------------------------------------------------------------------|------------------------|----------------|---------------------------------------------------------------------------------------------|
| [0.0.5](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.5) | 2.7.2 | 3.11 | [v0.0.5](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.5) |
| [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) | 2.7.2 | 3.11 | [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) |
| [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) | 2.7.2 | 3.11 | [v0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) |
| [0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) | 2.7.2 | 3.11 | [v0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) |
Expand Down
2 changes: 1 addition & 1 deletion docs/src/installation/pypi-installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

```text
# requirements.txt
airgoodies==0.0.4
airgoodies==0.0.5
```

For all the available versions please check [here](https://pypi.org/project/airgoodies/)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def get_readme():

setup(
name='airgoodies',
version='0.0.4',
version='0.0.5',
description='Various goodies for Apache Airflow',
long_description=get_readme(),
long_description_content_type='text/markdown',
Expand Down

0 comments on commit b76db73

Please sign in to comment.