-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #135 from TogetherCrew/wip-134
[Analyzer] Adding a place to start the task discord analyzer ETL
- Loading branch information
Showing
1 changed file
with
153 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# type: ignore | ||
# remove the above when all tasks were filled | ||
import logging | ||
from datetime import datetime | ||
|
||
from airflow import DAG | ||
from airflow.decorators import task | ||
|
||
with DAG( | ||
dag_id="discord_analyzer_etl", | ||
start_date=datetime(2024, 5, 1), | ||
schedule_interval="0 2 * * *", | ||
catchup=False, | ||
) as dag: | ||
|
||
@task | ||
def fetch_discord_platforms(**kwargs) -> list[dict[str, str | datetime | bool]]: | ||
""" | ||
fetch discord platforms | ||
Returns | ||
--------- | ||
platform_modules : list[dict[str, str | datetime | bool]] | ||
a list of data for each platform | ||
each platform's module information would have the information below | ||
``` | ||
{ | ||
'platform_id' : str, | ||
'period': datetime, | ||
'guild_id' : str, | ||
'recompute': bool, # default is False | ||
} | ||
``` | ||
""" | ||
# the platform that needs to be recomputed | ||
platform_id_recompute = kwargs["dag_run"].conf.get( # noqa: F841 | ||
"recompute_platform", None | ||
) | ||
|
||
# for default we're setting the recompute for all platforms to False | ||
# if an id for `recompute_platform` was given | ||
# then just run the ETL job for that platform with `recompute = True` | ||
# meaning the return would be a list with just one platform information | ||
|
||
# TODO | ||
pass | ||
|
||
@task | ||
def discord_etl_raw_data( | ||
platform_info: dict[str, str | datetime | bool] | ||
) -> dict[str, str | list[dict] | bool]: | ||
""" | ||
process one platform's data | ||
Parameters | ||
----------- | ||
platform_info : dict[str, str | datetime | bool] | ||
the information for extracting raw data | ||
the data should be something like this | ||
``` | ||
{ | ||
'platform_id' : datetime, | ||
'guild_id' : str, | ||
'period' : datetime, | ||
'recompute': bool, | ||
} | ||
``` | ||
if recompute was false, then fetch from the previously saved data date | ||
Returns | ||
-------- | ||
platform_processed : dict[str, str | bool] | ||
the platform that their data was processed | ||
``` | ||
{ | ||
platform_id: str, | ||
recompute : bool, | ||
} | ||
``` | ||
""" | ||
# TODO: EXTRACT | ||
# If recompute is False, then just extract from the latest saved document | ||
# within rawmemberactivities collection using their date | ||
# else, just extract from the `period` | ||
|
||
# TODO: TRANSFORM | ||
|
||
# TODO: LOAD | ||
# if recompute is True, then replace the whole previously saved data in | ||
# database with the new ones | ||
# else, just save the new ones | ||
|
||
pass | ||
|
||
@task | ||
def discord_etl_raw_members( | ||
platform_info: dict[str, str | datetime | bool] | ||
) -> None: | ||
""" | ||
extract raw members data for a platform | ||
Parameters | ||
----------- | ||
platform_info : dict[str, str | datetime | bool] | ||
the information for extracting raw members data | ||
the data should be something like this | ||
``` | ||
{ | ||
'platform_id' : datetime, | ||
'guild_id' : str, | ||
'period' : datetime, | ||
'recompute': bool, | ||
} | ||
``` | ||
""" | ||
# TODO: EXTRACT | ||
# if recompute was false, then will fetch from the previously saved data date | ||
# else, then will fetch all platform's members data | ||
|
||
# TODO: TRANSFORM | ||
|
||
# TODO: LOAD | ||
pass | ||
|
||
@task | ||
def analyze_discord(platform_processed: dict[str, str | bool]) -> None: | ||
""" | ||
start the analyzer to process data | ||
Parameters | ||
------------ | ||
platform_processed : dict[str, str | bool] | ||
the platform data to analyze | ||
value structure | ||
``` | ||
{ | ||
platform_id: str, | ||
recompute : bool, | ||
} | ||
``` | ||
""" | ||
logging.info(f"platform_processed: {platform_processed}") | ||
# TODO | ||
pass | ||
|
||
platform_modules = fetch_discord_platforms() | ||
|
||
raw_data_etl = discord_etl_raw_data.expand(platform_info=platform_modules) | ||
raw_members_etl = discord_etl_raw_members.expand(platform_info=platform_modules) | ||
|
||
raw_members_etl >> analyze_discord(platform_processed=raw_data_etl) |