Skip to content

Commit

Permalink
chore: etl-346 look back window of 15 minutes for incremental streams…
Browse files Browse the repository at this point in the history
… salesforce (#199)
  • Loading branch information
am6010 authored Jul 30, 2024
1 parent 7a832aa commit 6bf97a6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-salesforce/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "vcrpy==4.1.1", "pandas"]
MAIN_REQUIREMENTS = ["airbyte-cdk==0.67", "vcrpy==4.1.1", "pandas"]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):
class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
STREAM_SLICE_STEP = 30
LOOKBACK_WINDOW_IN_MINUTES = 15

def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
Expand All @@ -600,7 +601,10 @@ def stream_slices(
start, end = (None, None)
now = pendulum.now(tz="UTC")
initial_date = pendulum.parse((stream_state or {}).get(self.cursor_field, self.start_date), tz="UTC")

if self.primary_key:
# If the stream has a primary key, we can use a lookback window to avoid missing records.
# https://trailhead.salesforce.com/trailblazer-community/feed/0D54V00007T48TASAZ
initial_date = initial_date.subtract(minutes=self.LOOKBACK_WINDOW_IN_MINUTES)
slice_number = 1
while not end == now:
start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)
Expand Down

0 comments on commit 6bf97a6

Please sign in to comment.