Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/generic vector ingests #220

Merged
merged 39 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b93d79f
Add generic vector ingest
Aug 6, 2024
474f42b
Resolve arm role bug
Aug 6, 2024
bdceef1
fix smart open bug
Aug 6, 2024
991e764
logging few things
Aug 6, 2024
02faaac
passing params
Aug 7, 2024
3ac1f40
Try alternate download
Aug 7, 2024
ff307d6
Add test tables
Aug 7, 2024
8406751
add extra flags
Aug 8, 2024
3e062a3
Add comments
Aug 8, 2024
86884e8
Refactor layer naming and add IAM role assumption
Aug 9, 2024
f93a640
merge with new changes in dev
Aug 12, 2024
5624a5a
Add generic vector ingest
Aug 12, 2024
66921c5
revert back changes
Aug 12, 2024
a9665f9
Add generic vector ingest ecs
Aug 12, 2024
c685426
Add dag for generic vector ingest
Aug 12, 2024
487fe15
Modify to take generic vector ingest
Aug 12, 2024
2b6ef7e
modify family in task definition
Aug 13, 2024
d86bcd0
Fix log group
Aug 13, 2024
2acf30a
debug
Aug 13, 2024
0f2dc13
Add vector_ecs_conf
Aug 13, 2024
30ee61c
change to vector subnet
Aug 13, 2024
62e3b86
handle empty collection field
Aug 13, 2024
205db09
revert to vecto ecs conf
Aug 14, 2024
ede6200
add vector vpc conf
Aug 14, 2024
0e7cdd8
Add generic ingest to branching choices
Aug 14, 2024
893c7b4
Add to dag flow
Aug 14, 2024
9f70321
Adjust spaces
Aug 14, 2024
9258aec
Adjust space
Aug 14, 2024
1944a7a
remove space
Aug 14, 2024
a6039be
Update with dag info
Aug 14, 2024
2a01479
Update readme
Aug 14, 2024
f28150b
address sugesstions
Aug 14, 2024
3d5b32d
Add space
Aug 14, 2024
6230a33
update readme
Aug 14, 2024
8ee6129
Update content
Aug 14, 2024
d22fef5
Add pipeline info
Aug 14, 2024
3d6dcd6
deleted the other readme
Aug 14, 2024
0005a32
Modify readme
Aug 14, 2024
8bcfbad
Modify readme
Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dags/veda_data_pipeline/groups/discover_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def vector_raster_choice(ti):
dynamic_group_id = ti.task_id.split(".")[0]

if payload.get("vector"):
return f"{dynamic_group_id}.parallel_run_process_generic_vectors"
if payload.get("vector_eis"):
return f"{dynamic_group_id}.parallel_run_process_vectors"
return f"{dynamic_group_id}.parallel_run_process_rasters"

Expand Down Expand Up @@ -101,10 +103,17 @@ def subdag_discover(event={}):
python_callable=get_files_to_process,
)

run_process_generic_vector = TriggerMultiDagRunOperator(
task_id="parallel_run_process_generic_vectors",
trigger_dag_id="veda_generic_ingest_vector",
python_callable=get_files_to_process,
)

# extra no-op, needed to run in dynamic mapping context
end_discover = EmptyOperator(task_id="end_discover", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,)

discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector]
discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector,run_process_generic_vector]
slesaad marked this conversation as resolved.
Show resolved Hide resolved
run_process_raster >> end_discover
run_process_vector >> end_discover
run_process_generic_vector >> end_discover

108 changes: 108 additions & 0 deletions dags/veda_data_pipeline/veda_process_generic_vector_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import pendulum
from airflow import DAG
from airflow.models.variable import Variable
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import timedelta

dag_doc_md = """
### Build and submit stac
#### Purpose
This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API

#### Notes
- This DAG can run with the following configuration <br>
```json
{
"collection": "geoglam",
"prefix": "geoglam/",
"bucket": "veda-data-store-staging",
"filename_regex": "^(.*).tif$",
"discovery": "s3",
"datetime_range": "month",
"upload": false,
"cogify": false,
"discovered": 33,
"payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-19d164531cdc.json"
}
```
- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines)
slesaad marked this conversation as resolved.
Show resolved Hide resolved
"""

templat_dag_run_conf = {
"collection": "<collection_name>",
"prefix": "<prefix>/",
"bucket": "<bucket>",
"filename_regex": "<filename_regex>",
"discovery": "<s3>|cmr",
"datetime_range": "<month>|<day>",
"upload": "<false> | true",
"cogify": "false | true",
"payload": "<s3_uri_event_payload",
}
slesaad marked this conversation as resolved.
Show resolved Hide resolved
dag_args = {
"start_date": pendulum.today("UTC").add(days=-1),
"schedule_interval": None,
"catchup": False,
"doc_md": dag_doc_md,
}

with DAG(dag_id="veda_generic_ingest_vector", params=templat_dag_run_conf, **dag_args) as dag:
start = DummyOperator(task_id="Start", dag=dag)
end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag)

mwaa_stack_conf = Variable.get(
"MWAA_STACK_CONF", default_var={}, deserialize_json=True
)
vector_ecs_conf = Variable.get("VECTOR_ECS_CONF", deserialize_json=True)

generic_ingest_vector = EcsRunTaskOperator(
task_id="generic_ingest_vector",
trigger_rule=TriggerRule.NONE_FAILED,
cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster",
task_definition=f"{mwaa_stack_conf.get('PREFIX')}-generic-vector-tasks",
launch_type="FARGATE",
do_xcom_push=True,
execution_timeout=timedelta(minutes=120),
overrides={
"containerOverrides": [
{
"name": f"{mwaa_stack_conf.get('PREFIX')}-veda-generic_vector_ingest",
"command": [
"/var/lang/bin/python",
"handler.py",
"--payload",
"{}".format("{{ task_instance.dag_run.conf }}"),
],
"environment": [
{
"name": "EXTERNAL_ROLE_ARN",
"value": Variable.get(
"ASSUME_ROLE_READ_ARN", default_var=""
),
},
{
"name": "AWS_REGION",
"value": mwaa_stack_conf.get("AWS_REGION"),
},
{
"name": "VECTOR_SECRET_NAME",
"value": Variable.get("VECTOR_SECRET_NAME"),
},
],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": vector_ecs_conf.get("VECTOR_SECURITY_GROUP") + mwaa_stack_conf.get("SECURITYGROUPS"),
"subnets": vector_ecs_conf.get("VECTOR_SUBNETS"),
},
},
awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"),
awslogs_stream_prefix=f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-generic-vector_ingest", # prefix with container name
)

start >> generic_ingest_vector >> end
2 changes: 1 addition & 1 deletion dags/veda_data_pipeline/veda_process_vector_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
{
"name": "EXTERNAL_ROLE_ARN",
"value": Variable.get(
"ASSUME_ROLE_READ_ARN", default_var=None
"ASSUME_ROLE_READ_ARN", default_var=""
),
},
{
Expand Down
10 changes: 10 additions & 0 deletions docker_tasks/generic_vector_ingest/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM --platform=linux/amd64 ghcr.io/lambgeo/lambda-gdal:3.6-python3.9
RUN yum update -y

WORKDIR /app
ENTRYPOINT []
RUN pip install --upgrade pip
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY handler.py handler.py
204 changes: 204 additions & 0 deletions docker_tasks/generic_vector_ingest/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import base64
from argparse import ArgumentParser
import boto3
import os
import ast
import subprocess
import json
import smart_open
from urllib.parse import urlparse

def download_file(file_uri: str):
"""Downloads file from s3

Args:
file_uri (str): s3 URL of the file to be downloaded

Returns:
target_filepath (str): filepath of the downloaded file
"""
role_arn = os.environ.get("EXTERNAL_ROLE_ARN")
kwargs = assume_role(role_arn=role_arn) if role_arn else {}

s3 = boto3.client("s3", **kwargs)
url_parse = urlparse(file_uri)

bucket = url_parse.netloc
path = url_parse.path[1:]
filename = url_parse.path.split("/")[-1]
target_filepath = os.path.join("/tmp", filename)

s3.download_file(bucket, path, target_filepath)

s3.close()
return target_filepath

def assume_role(role_arn, session_name="veda-data-pipelines_vector-ingest"):
"""Assumes an AWS IAM role and returns temporary credentials.

Args:
role_arn (str): The ARN of the role to assume.
session_name (str): A name for the assumed session.

Returns:
dict: Temporary AWS credentials.
"""
sts = boto3.client("sts")
credentials = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=session_name,
)
creds = credentials["Credentials"]
return {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds.get("SecretAccessKey"),
"aws_session_token": creds.get("SessionToken"),
}


def get_connection_string(secret: dict, as_uri: bool = False) -> str:
if as_uri:
return f"postgresql://{secret['username']}:{secret['password']}@{secret['host']}:5432/{secret['dbname']}"
else:
#return f"PG:host=localhost port=5432 dbname=postgis user=username password=password"
return f"PG:host={secret['host']} dbname={secret['dbname']} user={secret['username']} password={secret['password']}"


def get_secret(secret_name: str) -> None:
"""Retrieve secrets from AWS Secrets Manager

Args:
secret_name (str): name of aws secrets manager secret containing database connection secrets

Returns:
secrets (dict): decrypted secrets in dict
"""

# Create a Secrets Manager client
session = boto3.session.Session(region_name=os.environ.get("AWS_REGION"))
client = session.client(service_name="secretsmanager")

# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
# We rethrow the exception by default.

get_secret_value_response = client.get_secret_value(SecretId=secret_name)

# Decrypts secret using the associated KMS key.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if "SecretString" in get_secret_value_response:
return json.loads(get_secret_value_response["SecretString"])
else:
return json.loads(base64.b64decode(get_secret_value_response["SecretBinary"]))



def load_to_featuresdb(
filename: str,
layer_name: str,
x_possible: str = "longitude",
y_possible: str = "latitude",
source_projection : str ="EPSG:4326",
target_projection : str ="EPSG:4326",
extra_flags: list = ["-overwrite", "-progress"]
):
secret_name = os.environ.get("VECTOR_SECRET_NAME")
con_secrets = get_secret(secret_name)
connection = get_connection_string(con_secrets)

print(f"running ogr2ogr import for collection/file: {layer_name}")
options = [
"ogr2ogr",
"-f",
"PostgreSQL",
connection,
filename,
"-oo",
f"X_POSSIBLE_NAMES={x_possible}",
"-oo",
f"Y_POSSIBLE_NAMES={y_possible}",
"-nln",
layer_name,
"-s_srs",
source_projection,
"-t_srs",
target_projection,
*extra_flags
]
out = subprocess.run(
options,
check=False,
capture_output=True,
)

if out.stderr:
error_description = f"Error: {out.stderr}"
print(error_description)
return {"status": "failure", "reason": error_description}

return {"status": "success"}

def handler():
print("------Vector ingestion for Features API started------")
parser = ArgumentParser(
prog="vector_ingest",
description="Ingest Vector",
epilog="Running the code as ECS task",
)
parser.add_argument(
"--payload", dest="payload", help="event passed to stac_handler function"
)
args = parser.parse_args()

# Extracting the payload passed from upstream task/dag or conf
payload_event = ast.literal_eval(args.payload)
s3_event = payload_event.pop("payload")

# Extracting configs for ingestion
x_possible = payload_event["x_possible"]
y_possible = payload_event["y_possible"]
source_projection = payload_event["source_projection"]
target_projection = payload_event["target_projection"]
extra_flags = payload_event["extra_flags"]

layer_name = payload_event["collection"]
collection_not_provided = layer_name == ""


# Read the json to extract the discovered file paths
with smart_open.open(s3_event, "r") as _file:
s3_event_read = _file.read()

event_received = json.loads(s3_event_read)
s3_objects = event_received["objects"]
status = list()

for s3_object in s3_objects:
href = s3_object["assets"]["default"]["href"]
filename = href.split("/")[-1].split(".")[0]

# Use id template when collection is not provided in the conf
if collection_not_provided:
layer_name = payload_event["id_template"].format(filename)
slesaad marked this conversation as resolved.
Show resolved Hide resolved

downloaded_filepath = download_file(href)
print(f"[ COLLECTION ]: {layer_name}, [ DOWNLOAD FILEPATH ]: {downloaded_filepath}")

coll_status = load_to_featuresdb(downloaded_filepath, layer_name,
x_possible, y_possible,
source_projection, target_projection,
extra_flags)
status.append(coll_status)

# Delete file after ingest
os.remove(downloaded_filepath)

if coll_status["status"] != "success":
# Bubble exception so Airflow shows it as a failure
raise Exception(coll_status["reason"])

print("------Overall Status------\n", f"Done for {len(status)} discovered files\n",status)


if __name__ == "__main__":
handler()
7 changes: 7 additions & 0 deletions docker_tasks/generic_vector_ingest/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
smart-open==6.3.0
psycopg2-binary==2.9.9
requests==2.30.0
boto3==1.26.129
GeoAlchemy2==0.14.2
geopandas==0.14.0
SQLAlchemy==2.0.23
slesaad marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions infrastructure/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ module "mwaa" {
docker_file_path = "${path.module}/../docker_tasks/vector_ingest/Dockerfile"
ecs_container_folder_path = "${path.module}/../docker_tasks/vector_ingest"
ecr_repo_name = "${var.prefix}-veda-vector_ingest"
},
{
handler_file_path = "${path.module}/../docker_tasks/generic_vector_ingest/handler.py"
docker_file_path = "${path.module}/../docker_tasks/generic_vector_ingest/Dockerfile"
ecs_container_folder_path = "${path.module}/../docker_tasks/generic_vector_ingest"
ecr_repo_name = "${var.prefix}-veda-generic_vector_ingest"
}
]
}
Expand Down
Loading