Skip to content

Commit

Permalink
fix(data-warehouse): use account id for stripe integration (#21226)
Browse files Browse the repository at this point in the history
* use account id

* comment
  • Loading branch information
EDsCODE authored Mar 28, 2024
1 parent 3b67989 commit 467fe48
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
4 changes: 4 additions & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> TSchemaTables:
from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source

stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None)
account_id = model.pipeline.job_inputs.get("stripe_account_id", None)
# Cludge: account_id should be checked here too but can deal with nulls
# until we require re update of account_ids in stripe so they're all store
if not stripe_secret_key:
raise ValueError(f"Stripe secret key not found for job {model.id}")
source = stripe_source(
api_key=stripe_secret_key,
account_id=account_id,
endpoints=tuple(endpoints),
team_id=inputs.team_id,
job_id=inputs.run_id,
Expand Down
7 changes: 6 additions & 1 deletion posthog/temporal/data_imports/pipelines/stripe/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def transform_date(date: Union[str, DateTime, int]) -> int:

async def stripe_get_data(
api_key: str,
account_id: str,
resource: str,
start_date: Optional[Any] = None,
end_date: Optional[Any] = None,
Expand All @@ -42,6 +43,7 @@ async def stripe_get_data(

resource_dict = await sync_to_async(_resource.list)(
api_key=api_key,
stripe_account=account_id,
created={"gte": start_date, "lt": end_date},
limit=100,
**kwargs,
Expand All @@ -53,6 +55,7 @@ async def stripe_get_data(

async def stripe_pagination(
api_key: str,
account_id: str,
endpoint: str,
team_id: int,
job_id: str,
Expand All @@ -75,6 +78,7 @@ async def stripe_pagination(

response = await stripe_get_data(
api_key,
account_id,
endpoint,
starting_after=starting_after,
)
Expand All @@ -97,7 +101,7 @@ async def stripe_pagination(

@dlt.source(max_table_nesting=0)
def stripe_source(
api_key: str, endpoints: Tuple[str, ...], team_id, job_id, starting_after: Optional[str] = None
api_key: str, account_id: str, endpoints: Tuple[str, ...], team_id, job_id, starting_after: Optional[str] = None
) -> Iterable[DltResource]:
for endpoint in endpoints:
yield dlt.resource(
Expand All @@ -106,6 +110,7 @@ def stripe_source(
write_disposition="append",
)(
api_key=api_key,
account_id=account_id,
endpoint=endpoint,
team_id=team_id,
job_id=job_id,
Expand Down
5 changes: 2 additions & 3 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
def _handle_stripe_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource:
payload = request.data["payload"]
client_secret = payload.get("client_secret")
account_id = payload.get("account_id")
prefix = request.data.get("prefix", None)
source_type = request.data["source_type"]

Expand All @@ -184,9 +185,7 @@ def _handle_stripe_source(self, request: Request, *args: Any, **kwargs: Any) ->
team=self.team,
status="Running",
source_type=source_type,
job_inputs={
"stripe_secret_key": client_secret,
},
job_inputs={"stripe_secret_key": client_secret, "stripe_account_id": account_id},
prefix=prefix,
)

Expand Down

0 comments on commit 467fe48

Please sign in to comment.