Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev to main BLOCKED by issue #252 #245

Open
wants to merge 58 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
c1d67da
WIP: dynamic ingest mapping for continuous tracing
ividito Jul 19, 2024
55744c6
feat: use taskflow for concurrent task runs
ividito Jul 31, 2024
ee98cf7
clean up debug code
ividito Jul 31, 2024
6e151a9
Bugfix vector ingest
ividito Aug 1, 2024
82c724f
merge dev
smohiudd Aug 20, 2024
54b885d
add generic vector pipeline
smohiudd Aug 20, 2024
a7b51e1
remove process generic vector pipeline
smohiudd Aug 20, 2024
318d39a
Update dags/veda_data_pipeline/veda_generic_vector_pipeline.py
ividito Aug 27, 2024
0ab867b
Restructure dataset get_files output, add retries to some tasks to av…
ividito Sep 9, 2024
f1411fe
Changes to make vector ingest work in shared VPC environment
ividito Oct 4, 2024
4db9b2c
feat:port event-driven vector automation to terraform
ividito Oct 17, 2024
ebb9993
fix: feature flag for vector automation lambda
ividito Oct 17, 2024
4eca40e
fix: adjust log group count
ividito Oct 17, 2024
0005892
fix tf vector automation vars
smohiudd Oct 18, 2024
38bff94
update vars in event bridge tf
smohiudd Oct 18, 2024
90aed8f
Merge pull request #239 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
021f4ff
add new line
smohiudd Oct 18, 2024
8ec0cd4
Merge pull request #240 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
abba102
fix tag filter
smohiudd Oct 18, 2024
006e989
Merge pull request #241 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
f181ca7
update vector subnet ref
smohiudd Oct 18, 2024
39401a0
Merge pull request #242 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
857e2fa
fix vector subnet ids
smohiudd Oct 18, 2024
9b43b89
Merge pull request #243 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
e3870e7
conditional vector subnets
smohiudd Oct 21, 2024
266be89
feat: disable default api gateway endpoint for workflows api
botanical Oct 21, 2024
65f0a8c
Merge pull request #246 from NASA-IMPACT/fix/vector-ingest-subnets
smohiudd Oct 21, 2024
d4a57e3
feat: add variable to configure disabling default apigw endpoint
botanical Oct 21, 2024
b9a9323
Merge pull request #197 from NASA-IMPACT/fix/asset-handling
smohiudd Oct 22, 2024
d77d946
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
7af892a
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
6d3e29a
Using pythonoperators instead of ECS operator
amarouane-ABDELHAK Oct 24, 2024
61a797d
Merge pull request #247 from NASA-IMPACT/jt/issue-452-disable-default…
botanical Oct 24, 2024
686c4c8
Switching to pythonOperator
amarouane-ABDELHAK Oct 24, 2024
c190564
Merge pull request #249 from NASA-IMPACT/feature/use-pythonoperators
amarouane-ABDELHAK Oct 25, 2024
305dd43
test: temporarily use test buckets
anayeaye Oct 25, 2024
a619098
Add S3 event bridge
amarouane-ABDELHAK Oct 29, 2024
4c9753f
Add S3 event bridge
amarouane-ABDELHAK Oct 29, 2024
59e1938
Add S3 event bridge
amarouane-ABDELHAK Oct 29, 2024
685e7db
Add env deployment example
amarouane-ABDELHAK Oct 30, 2024
dd0ea9e
Merge pull request #251 from NASA-IMPACT/feat/use-sm2a-in-eventbrige-…
amarouane-ABDELHAK Oct 30, 2024
8a3c584
only execute deploy action on push to dev branch
anayeaye Nov 6, 2024
4a277d3
Merge pull request #254 from NASA-IMPACT/ci/remove-staging-action
anayeaye Nov 6, 2024
db73561
ci: do not automatically deploy mwaa to any environment
anayeaye Nov 6, 2024
c450be2
Merge pull request #255 from NASA-IMPACT/ci/remove-all-mwaa-deploy-ac…
anayeaye Nov 8, 2024
b258d41
Adding condition on s3 event bridge
amarouane-ABDELHAK Nov 13, 2024
eb5e32c
Adding condition on s3 event bridge
amarouane-ABDELHAK Nov 14, 2024
4909f43
Fix some missing variables
amarouane-ABDELHAK Nov 14, 2024
87a08e2
Remove .env file
amarouane-ABDELHAK Nov 15, 2024
cf19bd3
Remove .env file
amarouane-ABDELHAK Nov 15, 2024
1335a92
Remove the airflow config to be created by CICD
amarouane-ABDELHAK Nov 15, 2024
dbcd631
print all transfer exceptions
anayeaye Nov 21, 2024
32b2acb
Merge pull request #257 from NASA-IMPACT/feat/use-sm2a-in-eventbrige-…
amarouane-ABDELHAK Nov 22, 2024
7872364
Merge branch 'dev' into fix/reveal-transfer-exception
anayeaye Nov 22, 2024
4374f34
print client err on failed transfer
anayeaye Nov 22, 2024
091bc80
raise transfer exception
anayeaye Nov 22, 2024
4f14c96
update secret variable type to string and add assume role arns to air…
anayeaye Nov 22, 2024
ef7be91
Merge pull request #260 from NASA-IMPACT/fix/reveal-transfer-exception
anayeaye Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 37 additions & 66 deletions dags/veda_data_pipeline/groups/discover_group.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
from datetime import timedelta
import time
import uuid

from airflow.models.variable import Variable
from airflow.models.xcom import LazyXComAccess
from airflow.operators.dummy_operator import DummyOperator as EmptyOperator
from airflow.decorators import task_group
from airflow.decorators import task_group, task
from airflow.models.baseoperator import chain
from airflow.operators.python import BranchPythonOperator, PythonOperator, ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow_multi_dagrun.operators import TriggerMultiDagRunOperator
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from veda_data_pipeline.utils.s3_discovery import (
s3_discovery_handler, EmptyFileListError
)
from veda_data_pipeline.groups.processing_tasks import build_stac_kwargs, submit_to_stac_ingestor_task

group_kwgs = {"group_id": "Discover", "tooltip": "Discover"}

group_kwgs = {"group_id": "Discover", "tooltip": "Discover"}

def discover_from_s3_task(ti, event={}, **kwargs):
@task(retries=1, retry_delay=timedelta(minutes=1))
def discover_from_s3_task(ti=None, event={}, **kwargs):
"""Discover grouped assets/files from S3 in batches of 2800. Produce a list of such files stored on S3 to process.
This task is used as part of the discover_group subdag and outputs data to EVENT_BUCKET.
"""
config = {
**event,
**ti.dag_run.conf,
}
# TODO test that this context var is available in taskflow
last_successful_execution = kwargs.get("prev_start_date_success")
if event.get("schedule") and last_successful_execution:
config["last_successful_execution"] = last_successful_execution.isoformat()
Expand All @@ -41,79 +46,45 @@ def discover_from_s3_task(ti, event={}, **kwargs):
)
except EmptyFileListError as ex:
print(f"Received an exception {ex}")
return []

# TODO test continued short circuit operator behavior (no files -> skip remaining tasks)
return {}

def get_files_to_process(ti):
@task
def get_files_to_process(payload, ti=None):
"""Get files from S3 produced by the discovery task.
Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks.
"""
dynamic_group_id = ti.task_id.split(".")[0]
payload = ti.xcom_pull(task_ids=f"{dynamic_group_id}.discover_from_s3")
if isinstance(payload, LazyXComAccess):
if isinstance(payload, LazyXComAccess): # if used as part of a dynamic task mapping
payloads_xcom = payload[0].pop("payload", [])
payload = payload[0]
else:
payloads_xcom = payload.pop("payload", [])
dag_run_id = ti.dag_run.run_id
for indx, payload_xcom in enumerate(payloads_xcom):
time.sleep(2)
yield {
return [{
"run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}",
**payload,
"payload": payload_xcom,
}


def vector_raster_choice(ti):
"""Choose whether to process rasters or vectors based on the payload."""
payload = ti.dag_run.conf
dynamic_group_id = ti.task_id.split(".")[0]

if payload.get("vector"):
return f"{dynamic_group_id}.parallel_run_process_generic_vectors"
if payload.get("vector_eis"):
return f"{dynamic_group_id}.parallel_run_process_vectors"
return f"{dynamic_group_id}.parallel_run_process_rasters"
} for indx, payload_xcom in enumerate(payloads_xcom)]

@task_group
def subdag_discover(event={}):
discover_from_s3 = ShortCircuitOperator(
task_id="discover_from_s3",
python_callable=discover_from_s3_task,
op_kwargs={"text": "Discover from S3", "event": event},
trigger_rule=TriggerRule.NONE_FAILED,
provide_context=True,
)

raster_vector_branching = BranchPythonOperator(
task_id="raster_vector_branching",
python_callable=vector_raster_choice,
)

run_process_raster = TriggerMultiDagRunOperator(
task_id="parallel_run_process_rasters",
trigger_dag_id="veda_ingest_raster",
python_callable=get_files_to_process,
)

run_process_vector = TriggerMultiDagRunOperator(
task_id="parallel_run_process_vectors",
trigger_dag_id="veda_ingest_vector",
python_callable=get_files_to_process,
)

run_process_generic_vector = TriggerMultiDagRunOperator(
task_id="parallel_run_process_generic_vectors",
trigger_dag_id="veda_generic_ingest_vector",
python_callable=get_files_to_process,
)
@task
def get_dataset_files_to_process(payload, ti=None):
"""Get files from S3 produced by the dataset task.
This is different from the get_files_to_process task as it produces a combined structure from repeated mappings.
"""
dag_run_id = ti.dag_run.run_id

# extra no-op, needed to run in dynamic mapping context
end_discover = EmptyOperator(task_id="end_discover", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,)

discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector, run_process_generic_vector]
run_process_raster >> end_discover
run_process_vector >> end_discover
run_process_generic_vector >> end_discover

result = []
for x in payload:
if isinstance(x, LazyXComAccess): # if used as part of a dynamic task mapping
payloads_xcom = x[0].pop("payload", [])
payload_0 = x[0]
else:
payloads_xcom = x.pop("payload", [])
payload_0 = x
for indx, payload_xcom in enumerate(payloads_xcom):
result.append({
"run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}",
**payload_0,
"payload": payload_xcom,
})
return result
95 changes: 0 additions & 95 deletions dags/veda_data_pipeline/groups/processing_group.py

This file was deleted.

Loading
Loading