Skip to content

Commit

Permalink
fix: assets
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Mar 23, 2024
1 parent d7c53ea commit 7a74ea4
Showing 1 changed file with 11 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,10 @@ def post_job_success(context: AssetExecutionContext, gcp_metrics: GcpMetricsReso
def air_quality_data(
context: AssetExecutionContext,
luchtmeetnet_api: LuchtMeetNetResource,
gcp_metrics: GcpMetricsResource,
) -> Output[pd.DataFrame]:
try:
df = get_air_quality_data_for_partition_key(
context.partition_key, context, luchtmeetnet_api
)
df_hash = hashlib.sha256(hash_pandas_object(df, index=True).values).hexdigest()
success = 1
return Output(df, data_version=DataVersion(df_hash))
except Exception as e:
success = 0
raise e
finally:
post_job_success(context, gcp_metrics, success)
df = get_air_quality_data_for_partition_key(context.partition_key, context, luchtmeetnet_api)
df_hash = hashlib.sha256(hash_pandas_object(df, index=True).values).hexdigest()
return Output(df, data_version=DataVersion(df_hash))


@asset(
Expand Down Expand Up @@ -123,16 +113,9 @@ def air_quality_data(
group_name="measurements",
)
def daily_air_quality_data(
context: AssetExecutionContext, ingested_data: pd.DataFrame, gcp_metrics: GcpMetricsResource
context: AssetExecutionContext, ingested_data: pd.DataFrame
) -> pd.DataFrame:
try:
success = 1
return ingested_data
except Exception as e:
success = 0
raise e
finally:
post_job_success(context, gcp_metrics, success)
return ingested_data


@asset(
Expand All @@ -153,20 +136,12 @@ def daily_air_quality_data(
def station_names(
context: AssetExecutionContext,
luchtmeetnet_api: LuchtMeetNetResource,
gcp_metrics: GcpMetricsResource,
) -> pd.DataFrame:
try:
success = 1
return pd.DataFrame(
luchtmeetnet_api.request(
os.path.join("stations", context.partition_key), context=context, paginate=False
)
return pd.DataFrame(
luchtmeetnet_api.request(
os.path.join("stations", context.partition_key), context=context, paginate=False
)
except Exception as e:
success = 0
raise e
finally:
post_job_success(context, gcp_metrics, success)
)


@asset(
Expand All @@ -186,13 +161,6 @@ def station_names(
group_name="stations",
)
def air_quality_station_names(
context: AssetExecutionContext, station_names: pd.DataFrame, gcp_metrics: GcpMetricsResource
context: AssetExecutionContext, station_names: pd.DataFrame
) -> pd.DataFrame:
try:
success = 1
return station_names
except Exception as e:
success = 0
raise e
finally:
post_job_success(context, gcp_metrics, success)
return station_names

0 comments on commit 7a74ea4

Please sign in to comment.