diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index 7e47fce763a0..617835137a3b 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "vcrpy==4.1.1", "pandas"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.67", "vcrpy==4.1.1", "pandas"] TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"] diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 90873025d48e..ff0ca0aef9e5 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -581,6 +581,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any): class IncrementalRestSalesforceStream(RestSalesforceStream, ABC): state_checkpoint_interval = 500 STREAM_SLICE_STEP = 30 + LOOKBACK_WINDOW_IN_MINUTES = 15 def __init__(self, replication_key: str, start_date: Optional[str], **kwargs): super().__init__(**kwargs) @@ -600,7 +601,10 @@ def stream_slices( start, end = (None, None) now = pendulum.now(tz="UTC") initial_date = pendulum.parse((stream_state or {}).get(self.cursor_field, self.start_date), tz="UTC") - + if self.primary_key: + # If the stream has a primary key, we can use a lookback window to avoid missing records. + # https://trailhead.salesforce.com/trailblazer-community/feed/0D54V00007T48TASAZ + initial_date = initial_date.subtract(minutes=self.LOOKBACK_WINDOW_IN_MINUTES) slice_number = 1 while not end == now: start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)