From 448c0b4dfbe8360f19468307633c3cd9eeddb552 Mon Sep 17 00:00:00 2001 From: Josh lloyd Date: Mon, 16 Sep 2024 11:57:42 -0600 Subject: [PATCH] skipped records of 503 errors after failed backoff --- pyproject.toml | 2 +- tap_salesforce_connect/client.py | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c2c738d..014a401 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-salesforce-connect" -version = "0.1.3" +version = "0.1.4" description = "`tap-salesforce-connect` is a Singer tap for SalesforceConnect, built with the Meltano Singer SDK." readme = "README.md" authors = ["Josh Lloyd"] diff --git a/tap_salesforce_connect/client.py b/tap_salesforce_connect/client.py index 346f3eb..dba5711 100644 --- a/tap_salesforce_connect/client.py +++ b/tap_salesforce_connect/client.py @@ -4,10 +4,12 @@ import json import sys +import typing as t from pathlib import Path from typing import Any, Callable, Generator, Iterable import requests +from singer_sdk import metrics from singer_sdk.helpers.jsonpath import extract_jsonpath from singer_sdk.streams import RESTStream @@ -125,3 +127,37 @@ def get_wait_time_based_on_response(self, exception): def backoff_wait_generator(self) -> Generator[float, None, None]: """Return a generator of wait times between retries.""" return self.backoff_runtime(value=self.get_wait_time_based_on_response) + + def request_records(self, context: dict | None) -> t.Iterable[dict]: + """Request records from REST endpoint(s), returning response records. + + If pagination is detected, pages will be recursed automatically. + + Args: + context: Stream partition or context dictionary. + + Yields: + An item for every record in the response. + """ + paginator = self.get_new_paginator() + decorated_request = self.request_decorator(self._request) + + with metrics.http_request_counter(self.name, self.path) as request_counter: + request_counter.context = context + + while not paginator.finished: + prepared_request = self.prepare_request( + context, + next_page_token=paginator.current_value, + ) + resp = decorated_request(prepared_request, context) + request_counter.increment() + self.update_sync_costs(prepared_request, resp, context) + + if resp.status_code in [503]: + msg = self.response_error_message(resp) + self.logger.info(f"Skipping records due to {msg}.") + paginator._finished = True + else: + yield from self.parse_response(resp) + paginator.advance(resp)