From 7fd81d5892d6d4aeccb415c028a227ab25ebf0d4 Mon Sep 17 00:00:00 2001 From: Prratek Ramchandani Date: Mon, 28 Feb 2022 17:19:45 -0500 Subject: [PATCH 1/2] support incremental replication --- pyproject.toml | 2 +- tap_instagram/streams.py | 335 +++++++++++++++++++++++++++++++++++---- tap_instagram/tap.py | 7 + 3 files changed, 315 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 46c6171..fd4b034 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-instagram" -version = "0.3.6" +version = "0.4.0" description = "`tap-instagram` is a Singer tap for Instagram, built with the Meltano SDK for Singer Taps." authors = ["Prratek Ramchandani"] keywords = [ diff --git a/tap_instagram/streams.py b/tap_instagram/streams.py index a43c166..357d602 100644 --- a/tap_instagram/streams.py +++ b/tap_instagram/streams.py @@ -6,6 +6,7 @@ import pendulum import requests from singer_sdk import typing as th # JSON Schema typing helpers +from singer_sdk.helpers.jsonpath import extract_jsonpath from tap_instagram.client import InstagramStream @@ -55,14 +56,14 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: return {"user_id": record["id"]} -class BaseMediaStream(InstagramStream): +class MediaStream(InstagramStream): """Define custom stream.""" + name = "media" + path = "/{user_id}/media" # user_id is populated using child context keys from UsersStream parent_stream_type = UsersStream primary_keys = ["id"] - # can't make this incremental since FB doesn't support filtering on the timestamp - # https://developers.facebook.com/docs/instagram-api/reference/ig-media#query-string-parameters - replication_key = None + replication_key = "timestamp" records_jsonpath = "$.data[*]" fields = [ "id", @@ -169,7 +170,7 @@ class BaseMediaStream(InstagramStream): ), th.Property( "timestamp", - th.StringType, + th.DateTimeType, description="ISO 8601 formatted creation date in UTC (default is UTC ±00:00)", ), th.Property( @@ -185,35 +186,42 @@ class BaseMediaStream(InstagramStream): ), ).to_dict() + def make_since_param(self, context: Optional[dict]) -> datetime: + state_ts = self.get_starting_timestamp(context) + return pendulum.instance(state_ts).subtract(days=self.config["media_insights_lookback_days"]) + def get_url_params( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: params = super().get_url_params(context, next_page_token) params["fields"] = ",".join(self.fields) + params["since"] = self.make_since_param(context) return params def get_child_context(self, record: dict, context: Optional[dict]) -> dict: return { - "user_id": context["user_id"], + # "user_id": context["user_id"], "media_id": record["id"], "media_type": record["media_type"], # media_product_type not present for carousel children media "media_product_type": record.get("media_product_type"), } - -class MediaStream(BaseMediaStream): - """Define custom stream.""" - - name = "media" - path = "/{user_id}/media" # user_id is populated using child context keys from UsersStream + def parse_response(self, response: requests.Response) -> Iterable[dict]: + for row in extract_jsonpath(self.records_jsonpath, input=response.json()): + if "timestamp" in row: + row["timestamp"] = pendulum.parse(row["timestamp"]).format("YYYY-MM-DD HH:mm:ss") + yield row -class StoriesStream(BaseMediaStream): +class StoriesStream(InstagramStream): """Define custom stream.""" name = "stories" path = "/{user_id}/stories" # user_id is populated using child context keys from UsersStream + parent_stream_type = UsersStream + primary_keys = ["id"] + records_jsonpath = "$.data[*]" fields = [ "id", "ig_id", @@ -231,9 +239,133 @@ class StoriesStream(BaseMediaStream): "username", "video_title", ] + # Optionally, you may also use `schema_filepath` in place of `schema`: + # schema_filepath = SCHEMAS_DIR / "users.json" + schema = th.PropertiesList( + th.Property( + "id", + th.StringType, + description="Media ID.", + ), + th.Property( + "ig_id", + th.StringType, + description="Instagram media ID.", + ), + th.Property( + "caption", + th.StringType, + description="Caption. Excludes album children. @ symbol excluded unless the app user can perform " + "admin-equivalent tasks on the Facebook Page connected to the Instagram account used to " + "create the caption.", + ), + th.Property( + "comments_count", + th.IntegerType, + description="Count of comments on the media. Excludes comments on album child media and the media's " + "caption. Includes replies on comments.", + ), + th.Property( + "is_comment_enabled", + th.BooleanType, + description="Indicates if comments are enabled or disabled. Excludes album children.", + ), + th.Property( + "like_count", + th.IntegerType, + description="Count of likes on the media. Excludes likes on album child media and likes on promoted posts " + "created from the media. Includes replies on comments.", + ), + th.Property( + "media_product_type", + th.StringType, + description="Surface where the media is published. Can be AD, FEED, IGTV, or STORY.", + ), + th.Property( + "media_type", + th.StringType, + description="Media type. Can be CAROUSEL_ALBUM, IMAGE, or VIDEO.", + ), + th.Property( + "media_url", + th.StringType, + description="Media URL. Will be omitted from responses if the media contains copyrighted material, " + "or has been flagged for a copyright violation.", + ), + th.Property( + "owner", + th.ObjectType( + th.Property( + "id", + th.StringType, + description="ID of Instagram user who created the media.", + ), + th.Property( + "username", + th.StringType, + description="Username of Instagram user who created the media.", + ), + ), + description="ID of Instagram user who created the media. Only returned if the app user making the query " + "also created the media, otherwise username field will be returned instead.", + ), + th.Property( + "permalink", + th.StringType, + description="Permanent URL to the media.", + ), + th.Property( + "shortcode", + th.StringType, + description="Shortcode to the media.", + ), + th.Property( + "thumbnail_url", + th.StringType, + description="Media thumbnail URL. Only available on VIDEO media.", + ), + th.Property( + "timestamp", + th.DateTimeType, + description="ISO 8601 formatted creation date in UTC (default is UTC ±00:00)", + ), + th.Property( + "username", + th.StringType, + description="Username of user who created the media.", + ), + th.Property( + "video_title", + th.StringType, + description="Instagram TV media title. Will not be returned if targeting an Instagram TV video created on " + "or after October 5, 2021.", + ), + ).to_dict() + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + params = super().get_url_params(context, next_page_token) + params["fields"] = ",".join(self.fields) + return params + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + return { + # "user_id": context["user_id"], + "media_id": record["id"], + "media_type": record["media_type"], + # media_product_type not present for carousel children media + "media_product_type": record.get("media_product_type"), + } -class MediaChildrenStream(BaseMediaStream): + def parse_response(self, response: requests.Response) -> Iterable[dict]: + for row in extract_jsonpath(self.records_jsonpath, input=response.json()): + if "timestamp" in row: + row["timestamp"] = pendulum.parse(row["timestamp"]).format("YYYY-MM-DD HH:mm:ss") + yield row + + +class MediaChildrenStream(MediaStream): """Define custom stream.""" name = "media_children" @@ -257,11 +389,20 @@ class MediaChildrenStream(BaseMediaStream): "username", ] + def parse_response(self, response: requests.Response) -> Iterable[dict]: + for row in extract_jsonpath(self.records_jsonpath, input=response.json()): + if "timestamp" in row: + row["timestamp"] = pendulum.parse(row["timestamp"]).format("YYYY-MM-DD HH:mm:ss") + yield row + -class BaseMediaInsightsStream(InstagramStream): +class MediaInsightsStream(InstagramStream): """Define custom stream.""" + name = "media_insights" path = "/{media_id}/insights" + parent_stream_type = MediaStream + state_partitioning_keys = ["user_id"] primary_keys = "id" replication_key = None records_jsonpath = "$.data[*]" @@ -284,7 +425,7 @@ class BaseMediaInsightsStream(InstagramStream): ), th.Property( "end_time", - th.StringType, + th.DateTimeType, description="", ), th.Property( @@ -387,21 +528,23 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: item = { "context": key, "value": value, - "end_time": values["end_time"], + "end_time": pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss"), } item.update(base_item) yield item else: values.update(base_item) + if "end_time" in values: + values["end_time"] = pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss") yield values -class MediaInsightsStream(BaseMediaInsightsStream): - """Define custom stream.""" - - name = "media_insights" - parent_stream_type = MediaStream - state_partitioning_keys = ["user_id"] +# class MediaInsightsStream(BaseMediaInsightsStream): +# """Define custom stream.""" +# +# name = "media_insights" +# parent_stream_type = MediaStream +# state_partitioning_keys = ["user_id"] # Insights not available for children media objects @@ -412,12 +555,147 @@ class MediaInsightsStream(BaseMediaInsightsStream): # parent_stream_type = MediaChildrenStream -class StoryInsightsStream(BaseMediaInsightsStream): +class StoryInsightsStream(InstagramStream): """Define custom stream.""" name = "story_insights" + path = "/{media_id}/insights" parent_stream_type = StoriesStream state_partitioning_keys = ["user_id"] + primary_keys = "id" + replication_key = None + records_jsonpath = "$.data[*]" + + schema = th.PropertiesList( + th.Property( + "id", + th.StringType, + description="", + ), + th.Property( + "name", + th.StringType, + description="", + ), + th.Property( + "period", + th.StringType, + description="", + ), + th.Property( + "end_time", + th.DateTimeType, + description="", + ), + th.Property( + "context", + th.StringType, + description="", + ), + th.Property( + "value", + th.IntegerType, + description="", + ), + th.Property( + "title", + th.StringType, + description="", + ), + th.Property( + "description", + th.StringType, + description="", + ), + ).to_dict() + + @staticmethod + def _metrics_for_media_type(media_type: str, media_product_type: str): + # TODO: Define types for these function args + if media_type in ("IMAGE", "VIDEO"): + if media_product_type == "STORY": + return [ + "exits", + "impressions", + "reach", + "replies", + "taps_forward", + "taps_back", + ] + else: # media_product_type is "AD" or "FEED" + metrics = [ + "engagement", + "impressions", + "reach", + "saved", + ] + if media_type == "VIDEO": + metrics.append("video_views") + return metrics + elif media_type == "CAROUSEL_ALBUM": + return [ + "carousel_album_engagement", + "carousel_album_impressions", + "carousel_album_reach", + "carousel_album_saved", + "carousel_album_video_views", + ] + else: + raise ValueError( + f"media_type from parent record must be one of IMAGE, VIDEO, CAROUSEL_ALBUM, got: {media_type}" + ) + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + params = super().get_url_params(context, next_page_token) + params["metric"] = self._metrics_for_media_type( + context["media_type"], context["media_product_type"] + ) + return params + + def validate_response(self, response: requests.Response) -> None: + if ( + response.json().get("error", {}).get("error_user_title") + == "Media posted before business account conversion" + ): + self.logger.warning(f"Skipping: {response.json()['error']}") + return + super().validate_response(response) + + def parse_response(self, response: requests.Response) -> Iterable[dict]: + resp_json = response.json() + # Handle the specific case where FB returns error because media was posted before business acct creation + # TODO: Refactor to raise a specific error in validate_response and handle that instead + if ( + resp_json.get("error", {}).get("error_user_title") + == "Media posted before business account conversion" + ): + return + for row in resp_json["data"]: + base_item = { + "name": row["name"], + "period": row["period"], + "title": row["title"], + "id": row["id"], + "description": row["description"], + } + if "values" in row: + for values in row["values"]: + if isinstance(values["value"], dict): + for key, value in values["value"].items(): + item = { + "context": key, + "value": value, + "end_time": pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss"), + } + item.update(base_item) + yield item + else: + values.update(base_item) + if "end_time" in values: + values["end_time"] = pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss") + yield values class UserInsightsStream(InstagramStream): @@ -453,7 +731,7 @@ class UserInsightsStream(InstagramStream): ), th.Property( "end_time", - th.StringType, + th.DateTimeType, description="", ), th.Property( @@ -498,7 +776,7 @@ def _fetch_time_based_pagination_range( Returns: DateTime objects for "since" and "until" """ try: - since = max(self.get_starting_timestamp(context), min_since) + since = min(max(self.get_starting_timestamp(context), min_since), max_until) window_end = min( self.get_replication_key_signpost(context), pendulum.instance(since).add(seconds=max_time_window.seconds), @@ -549,12 +827,14 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: item = { "context": key, "value": value, - "end_time": values["end_time"], + "end_time": pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss"), } item.update(base_item) yield item else: values.update(base_item) + if "end_time" in values: + values["end_time"] = pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss") yield values @@ -596,7 +876,6 @@ class UserInsightsDailyStream(UserInsightsStream): name = "user_insights_daily" metrics = [ "email_contacts", - # "follower_count", "get_directions_clicks", "impressions", "phone_call_clicks", diff --git a/tap_instagram/tap.py b/tap_instagram/tap.py index 66af3c8..aadabba 100644 --- a/tap_instagram/tap.py +++ b/tap_instagram/tap.py @@ -59,6 +59,13 @@ class TapInstagram(Tap): required=True, description="User IDs of the Instagram accounts to replicate", ), + th.Property( + "media_insights_lookback_days", + th.IntegerType, + default=60, + description="The tap fetches media insights for Media objects posted in the last `insights_lookback_days` " + "days - defaults to 14 days if not provided" + ), th.Property( "start_date", th.DateTimeType, From 0d38daee6d8dc8837fe1e5db4de659e39531a644 Mon Sep 17 00:00:00 2001 From: Prratek Ramchandani Date: Mon, 28 Feb 2022 17:28:01 -0500 Subject: [PATCH 2/2] fix tests and linting errors --- tap_instagram/streams.py | 43 ++++++++++++++++++++++++++++++---------- tap_instagram/tap.py | 2 +- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/tap_instagram/streams.py b/tap_instagram/streams.py index 357d602..1aec4a1 100644 --- a/tap_instagram/streams.py +++ b/tap_instagram/streams.py @@ -188,7 +188,12 @@ class MediaStream(InstagramStream): def make_since_param(self, context: Optional[dict]) -> datetime: state_ts = self.get_starting_timestamp(context) - return pendulum.instance(state_ts).subtract(days=self.config["media_insights_lookback_days"]) + if state_ts: + return pendulum.instance(state_ts).subtract( + days=self.config["media_insights_lookback_days"] + ) + else: + return state_ts def get_url_params( self, context: Optional[dict], next_page_token: Optional[Any] @@ -210,7 +215,9 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: def parse_response(self, response: requests.Response) -> Iterable[dict]: for row in extract_jsonpath(self.records_jsonpath, input=response.json()): if "timestamp" in row: - row["timestamp"] = pendulum.parse(row["timestamp"]).format("YYYY-MM-DD HH:mm:ss") + row["timestamp"] = pendulum.parse(row["timestamp"]).format( + "YYYY-MM-DD HH:mm:ss" + ) yield row @@ -361,7 +368,9 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: def parse_response(self, response: requests.Response) -> Iterable[dict]: for row in extract_jsonpath(self.records_jsonpath, input=response.json()): if "timestamp" in row: - row["timestamp"] = pendulum.parse(row["timestamp"]).format("YYYY-MM-DD HH:mm:ss") + row["timestamp"] = pendulum.parse(row["timestamp"]).format( + "YYYY-MM-DD HH:mm:ss" + ) yield row @@ -392,7 +401,9 @@ class MediaChildrenStream(MediaStream): def parse_response(self, response: requests.Response) -> Iterable[dict]: for row in extract_jsonpath(self.records_jsonpath, input=response.json()): if "timestamp" in row: - row["timestamp"] = pendulum.parse(row["timestamp"]).format("YYYY-MM-DD HH:mm:ss") + row["timestamp"] = pendulum.parse(row["timestamp"]).format( + "YYYY-MM-DD HH:mm:ss" + ) yield row @@ -528,14 +539,18 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: item = { "context": key, "value": value, - "end_time": pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss"), + "end_time": pendulum.parse(values["end_time"]).format( + "YYYY-MM-DD HH:mm:ss" + ), } item.update(base_item) yield item else: values.update(base_item) if "end_time" in values: - values["end_time"] = pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss") + values["end_time"] = pendulum.parse( + values["end_time"] + ).format("YYYY-MM-DD HH:mm:ss") yield values @@ -687,14 +702,18 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: item = { "context": key, "value": value, - "end_time": pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss"), + "end_time": pendulum.parse(values["end_time"]).format( + "YYYY-MM-DD HH:mm:ss" + ), } item.update(base_item) yield item else: values.update(base_item) if "end_time" in values: - values["end_time"] = pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss") + values["end_time"] = pendulum.parse( + values["end_time"] + ).format("YYYY-MM-DD HH:mm:ss") yield values @@ -827,14 +846,18 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: item = { "context": key, "value": value, - "end_time": pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss"), + "end_time": pendulum.parse(values["end_time"]).format( + "YYYY-MM-DD HH:mm:ss" + ), } item.update(base_item) yield item else: values.update(base_item) if "end_time" in values: - values["end_time"] = pendulum.parse(values["end_time"]).format("YYYY-MM-DD HH:mm:ss") + values["end_time"] = pendulum.parse( + values["end_time"] + ).format("YYYY-MM-DD HH:mm:ss") yield values diff --git a/tap_instagram/tap.py b/tap_instagram/tap.py index aadabba..d524ae2 100644 --- a/tap_instagram/tap.py +++ b/tap_instagram/tap.py @@ -64,7 +64,7 @@ class TapInstagram(Tap): th.IntegerType, default=60, description="The tap fetches media insights for Media objects posted in the last `insights_lookback_days` " - "days - defaults to 14 days if not provided" + "days - defaults to 14 days if not provided", ), th.Property( "start_date",