Skip to content

Commit

Permalink
refactor: save data files under env
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Nov 18, 2024
1 parent 01b9bff commit d25addf
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions src/teamster/core/io_managers/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None:
bucket_obj: Bucket = self.bucket_obj

records, schema = obj
local_path = "env" / path

if self.test:
import json
Expand All @@ -121,10 +122,10 @@ def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None:
test_path.parent.mkdir(parents=True, exist_ok=True)
json.dump(obj=records, fp=test_path.open("w"))

context.log.info(f"Writing records to {path}")
path.parent.mkdir(parents=True, exist_ok=True)
context.log.info(f"Writing records to {local_path}")
local_path.parent.mkdir(parents=True, exist_ok=True)

with path.open(mode="wb") as fo:
with local_path.open(mode="wb") as fo:
fastavro.writer(
fo=fo,
schema=fastavro.parse_schema(schema),
Expand All @@ -138,7 +139,7 @@ def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None:
backoff(
fn=bucket_obj.blob(blob_name=str(path)).upload_from_filename,
retry_on=(TooManyRequests, Forbidden, ServiceUnavailable),
kwargs={"filename": path},
kwargs={"filename": local_path},
)


Expand Down

0 comments on commit d25addf

Please sign in to comment.