From d25addf5796974d97b7bbcadadd02d3dd1be6a6f Mon Sep 17 00:00:00 2001 From: Charlie Bini <5003326+cbini@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:33:10 +0000 Subject: [PATCH] refactor: save data files under env --- src/teamster/core/io_managers/gcs.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/teamster/core/io_managers/gcs.py b/src/teamster/core/io_managers/gcs.py index 7d667f4d2..80edcfab9 100644 --- a/src/teamster/core/io_managers/gcs.py +++ b/src/teamster/core/io_managers/gcs.py @@ -112,6 +112,7 @@ def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None: bucket_obj: Bucket = self.bucket_obj records, schema = obj + local_path = "env" / path if self.test: import json @@ -121,10 +122,10 @@ def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None: test_path.parent.mkdir(parents=True, exist_ok=True) json.dump(obj=records, fp=test_path.open("w")) - context.log.info(f"Writing records to {path}") - path.parent.mkdir(parents=True, exist_ok=True) + context.log.info(f"Writing records to {local_path}") + local_path.parent.mkdir(parents=True, exist_ok=True) - with path.open(mode="wb") as fo: + with local_path.open(mode="wb") as fo: fastavro.writer( fo=fo, schema=fastavro.parse_schema(schema), @@ -138,7 +139,7 @@ def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None: backoff( fn=bucket_obj.blob(blob_name=str(path)).upload_from_filename, retry_on=(TooManyRequests, Forbidden, ServiceUnavailable), - kwargs={"filename": path}, + kwargs={"filename": local_path}, )