From 66c27c338d3d59816e7004521763ea73f36ea832 Mon Sep 17 00:00:00 2001 From: Alexander Shea Date: Fri, 5 Apr 2024 10:51:29 -0600 Subject: [PATCH 1/2] add session creation from supplied access id and secret key --- target_s3_parquet/sinks.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/target_s3_parquet/sinks.py b/target_s3_parquet/sinks.py index 99cec49..e38ae31 100644 --- a/target_s3_parquet/sinks.py +++ b/target_s3_parquet/sinks.py @@ -3,6 +3,7 @@ from typing import Dict, List, Optional import awswrangler as wr +from boto3.session import Session from pandas import DataFrame from singer_sdk import Target from singer_sdk.sinks import BatchSink @@ -35,7 +36,16 @@ def __init__( ) -> None: super().__init__(target, stream_name, schema, key_properties) + self._session = Session() if self._is_using_hmac() else Session( + aws_access_key_id=self.config.get("aws_access_key_id"), + aws_secret_access_key=self.config.get("aws_secret_access_key"), + ) + self._glue_schema = self._get_glue_schema() + + def _is_using_hmac(self) -> bool: + return isinstance(self.config.get("aws_access_key_id"), str) and \ + isinstance(self.config.get("aws_secret_access_key"), str) def _get_glue_schema(self): @@ -92,6 +102,7 @@ def process_batch(self, context: dict) -> None: partition_cols=["_sdc_started_at"], schema_evolution=True, dtype=dtype, + boto3_session=self._session, ) self.logger.info(f"Uploaded {len(context['records'])}") From 7c02ec4c17c01fe1507ce57148318947fc155f71 Mon Sep 17 00:00:00 2001 From: Alexander Shea Date: Fri, 5 Apr 2024 11:10:09 -0600 Subject: [PATCH 2/2] switch to updating the default session --- target_s3_parquet/sinks.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/target_s3_parquet/sinks.py b/target_s3_parquet/sinks.py index e38ae31..23fc9c0 100644 --- a/target_s3_parquet/sinks.py +++ b/target_s3_parquet/sinks.py @@ -3,7 +3,7 @@ from typing import Dict, List, Optional import awswrangler as wr -from boto3.session import Session +from boto3 import setup_default_session from pandas import DataFrame from singer_sdk import Target from singer_sdk.sinks import BatchSink @@ -36,10 +36,11 @@ def __init__( ) -> None: super().__init__(target, stream_name, schema, key_properties) - self._session = Session() if self._is_using_hmac() else Session( - aws_access_key_id=self.config.get("aws_access_key_id"), - aws_secret_access_key=self.config.get("aws_secret_access_key"), - ) + if self._is_using_hmac(): + setup_default_session( + aws_access_key_id=self.config.get("aws_access_key_id"), + aws_secret_access_key=self.config.get("aws_secret_access_key"), + ) self._glue_schema = self._get_glue_schema() @@ -102,7 +103,6 @@ def process_batch(self, context: dict) -> None: partition_cols=["_sdc_started_at"], schema_evolution=True, dtype=dtype, - boto3_session=self._session, ) self.logger.info(f"Uploaded {len(context['records'])}")