Skip to content

Commit

Permalink
skipped records of 503 errors after failed backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
jlloyd-widen committed Sep 16, 2024
1 parent 7919c6d commit 448c0b4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tap-salesforce-connect"
version = "0.1.3"
version = "0.1.4"
description = "`tap-salesforce-connect` is a Singer tap for SalesforceConnect, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Josh Lloyd"]
Expand Down
36 changes: 36 additions & 0 deletions tap_salesforce_connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import json
import sys
import typing as t
from pathlib import Path
from typing import Any, Callable, Generator, Iterable

import requests
from singer_sdk import metrics
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream

Expand Down Expand Up @@ -125,3 +127,37 @@ def get_wait_time_based_on_response(self, exception):
def backoff_wait_generator(self) -> Generator[float, None, None]:
"""Return a generator of wait times between retries."""
return self.backoff_runtime(value=self.get_wait_time_based_on_response)

def request_records(self, context: dict | None) -> t.Iterable[dict]:
"""Request records from REST endpoint(s), returning response records.
If pagination is detected, pages will be recursed automatically.
Args:
context: Stream partition or context dictionary.
Yields:
An item for every record in the response.
"""
paginator = self.get_new_paginator()
decorated_request = self.request_decorator(self._request)

with metrics.http_request_counter(self.name, self.path) as request_counter:
request_counter.context = context

while not paginator.finished:
prepared_request = self.prepare_request(
context,
next_page_token=paginator.current_value,
)
resp = decorated_request(prepared_request, context)
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)

if resp.status_code in [503]:
msg = self.response_error_message(resp)
self.logger.info(f"Skipping records due to {msg}.")
paginator._finished = True
else:
yield from self.parse_response(resp)
paginator.advance(resp)

0 comments on commit 448c0b4

Please sign in to comment.