diff --git a/recipes/VideoBots.py b/recipes/VideoBots.py index 2e47aaae1..591ef397a 100644 --- a/recipes/VideoBots.py +++ b/recipes/VideoBots.py @@ -8,7 +8,7 @@ from furl import furl from pydantic import BaseModel, Field -from bots.models import BotIntegration, Platform +from bots.models import BotIntegration, Platform, SavedRun, PublishedRun from bots.models import Workflow from celeryapp.tasks import send_integration_attempt_email from daras_ai.image_input import ( @@ -1088,10 +1088,6 @@ def render_integrations_tab(self): ) return - # if we come from an integration redirect, we connect the integrations - if "connect_ids" in self.request.query_params: - self.integrations_on_connect(current_run, published_run) - # see which integrations are available to the user for the current published run assert published_run, "At this point, published_run should be available" integrations_q = Q(published_run=published_run) | Q( @@ -1165,7 +1161,13 @@ def render_integrations_add(self, label: str, run_title: str): gui.caption(choice.label) if pressed_platform: - on_connect = self.current_app_url(RecipeTabs.integrations) + current_run, published_run = ( + self.get_current_sr(), + self.get_current_published_run(), + ) + current_run_id, published_run_id = ( + current_run.run_id if current_run else None + ), (published_run.published_run_id if published_run else None) match pressed_platform: case Platform.WEB: bi = BotIntegration.objects.create( @@ -1173,16 +1175,13 @@ def render_integrations_add(self, label: str, run_title: str): billing_account_uid=self.request.user.uid, platform=Platform.WEB, ) - redirect_url = str( - furl(on_connect).add(query_params=dict(connect_ids=str(bi.id))) - / bi.api_integration_id() - ) + redirect_url = connect(bi, current_run, published_run) case Platform.WHATSAPP: - redirect_url = wa_connect_url(on_connect) + redirect_url = wa_connect_url(current_run_id, published_run_id) case Platform.SLACK: - redirect_url = slack_connect_url(on_connect) + redirect_url = slack_connect_url(current_run_id, published_run_id) case Platform.FACEBOOK: - redirect_url = fb_connect_url(on_connect) + redirect_url = fb_connect_url(current_run_id, published_run_id) case _: raise ValueError(f"Unsupported platform: {pressed_platform}") @@ -1416,41 +1415,34 @@ def render_integrations_settings( bi.save() gui.rerun() - def integrations_on_connect(self, current_run, published_run): - from app_users.models import AppUser - from daras_ai_v2.slack_bot import send_confirmation_msg - bi = None - for bid in self.request.query_params.getlist("connect_ids"): - try: - bi = BotIntegration.objects.get(id=bid) - except BotIntegration.DoesNotExist: - continue - if bi.saved_run is not None: - with gui.center(): - gui.write( - f"⚠️ {bi.get_display_name()} is already connected to a different published run by {AppUser.objects.filter(uid=bi.billing_account_uid).first().display_name}. Please disconnect it first." - ) - return +def connect( + bi: BotIntegration, current_run: SavedRun, published_run: PublishedRun | None +) -> RedirectException: + """ + Connect the bot integration to the provided saved and published runs. + Returns a redirect exception to the integrations page for that bot integration. + """ - bi.streaming_enabled = True - bi.saved_run = current_run - if published_run and published_run.saved_run_id == current_run.id: - bi.published_run = published_run - else: - bi.published_run = None - if bi.platform == Platform.SLACK: - bi.slack_create_personal_channels = False - send_confirmation_msg(bi) - bi.save() - - if bi: - path_params = dict(integration_id=bi.api_integration_id()) - else: - path_params = dict() - raise gui.RedirectException( - self.current_app_url(RecipeTabs.integrations, path_params=path_params) - ) + from daras_ai_v2.slack_bot import send_confirmation_msg + + print(f"Connecting {bi} to {current_run} and {published_run}") + + bi.streaming_enabled = True + bi.saved_run = current_run + if published_run and published_run.saved_run.id == current_run.id: + bi.published_run = published_run + else: + bi.published_run = None + if bi.platform == Platform.SLACK: + bi.slack_create_personal_channels = False + send_confirmation_msg(bi) + bi.save() + + path_params = dict(integration_id=bi.api_integration_id()) + return RedirectException( + VideoBotsPage.current_app_url(RecipeTabs.integrations, path_params=path_params) + ) def messages_as_prompt(query_msgs: list[dict]) -> str: diff --git a/routers/facebook_api.py b/routers/facebook_api.py index 4e396f466..7a34c484f 100644 --- a/routers/facebook_api.py +++ b/routers/facebook_api.py @@ -1,17 +1,19 @@ import requests from fastapi.responses import RedirectResponse from furl import furl +import json from starlette.background import BackgroundTasks from starlette.requests import Request from starlette.responses import HTMLResponse, Response -from bots.models import BotIntegration, Platform +from bots.models import BotIntegration, Platform, SavedRun, PublishedRun from daras_ai_v2 import settings, db from daras_ai_v2.bots import msg_handler from daras_ai_v2.exceptions import raise_for_status from daras_ai_v2.facebook_bots import WhatsappBot, FacebookBot from daras_ai_v2.fastapi_tricks import fastapi_request_json from daras_ai_v2.functional import map_parallel +from recipes.VideoBots import connect from routers.custom_api_router import CustomAPIRouter app = CustomAPIRouter() @@ -23,8 +25,16 @@ def fb_connect_whatsapp_redirect(request: Request): redirect_url = furl("/login", query_params={"next": request.url}) return RedirectResponse(str(redirect_url)) - on_completion = request.query_params.get("state") - retry_button = f'Retry' + connection_state = json.loads(request.query_params.get("state", "{}")) + current_run_id = connection_state.get("current_run_id", None) + published_run_id = connection_state.get("published_run_id", None) + retry_button = ( + f'Retry' + ) + current_run = SavedRun.objects.get(run_id=current_run_id) + published_run = PublishedRun.objects.filter( + published_run_id=published_run_id + ).first() code = request.query_params.get("code") if not code: @@ -64,7 +74,7 @@ def fb_connect_whatsapp_redirect(request: Request): # {'data': [{'verified_name': 'XXXX', 'code_verification_status': 'VERIFIED', 'display_phone_number': 'XXXX', 'quality_rating': 'UNKNOWN', 'platform_type': 'NOT_APPLICABLE', 'throughput': {'level': 'NOT_APPLICABLE'}, 'last_onboarded_time': '2024-02-22T20:42:16+0000', 'id': 'XXXX'}], 'paging': {'cursors': {'before': 'XXXX', 'after': 'XXXX'}}} phone_numbers = r.json()["data"] - integrations = [] + redirect = None for phone_number in phone_numbers: business_name = phone_number["verified_name"] display_phone_number = phone_number["display_phone_number"] @@ -98,7 +108,7 @@ def fb_connect_whatsapp_redirect(request: Request): ) r.raise_for_status() - # subscript our app to weebhooks for WABA + # subscribe our app to weebhooks for WABA r = requests.post( f"https://graph.facebook.com/v19.0/{waba_id}/subscribed_apps?access_token={user_access_token}", json={ @@ -110,10 +120,12 @@ def fb_connect_whatsapp_redirect(request: Request): ) r.raise_for_status() - integrations.append(bi) + redirect = connect(bi, current_run, published_run) - return return_to_app_url(integrations, on_completion) or HTMLResponse( - f"Sucessfully Connected to whatsapp! You may now close this page." + return ( + RedirectResponse(url=redirect.url, status_code=redirect.status_code) + if redirect + else HTMLResponse("No phone numbers found!" + retry_button) ) @@ -123,8 +135,16 @@ def fb_connect_redirect(request: Request): redirect_url = furl("/login", query_params={"next": request.url}) return RedirectResponse(str(redirect_url)) - on_completion = request.query_params.get("state") - retry_button = f'Retry' + connection_state = json.loads(request.query_params.get("state", "{}")) + current_run_id: str | None = connection_state.get("current_run_id", None) + published_run_id: str | None = connection_state.get("published_run_id", None) + retry_button = ( + f'Retry' + ) + current_run = SavedRun.objects.get(run_id=current_run_id) + published_run = PublishedRun.objects.filter( + published_run_id=published_run_id + ).first() code = request.query_params.get("code") if not code: @@ -146,14 +166,31 @@ def fb_connect_redirect(request: Request): status_code=400, ) + # only connect to pages that are not already connected + fb_pages = [ + fb_page + for fb_page in fb_pages + if BotIntegration.objects.filter( + fb_page_id=fb_page["id"] + ) # this will need to change if we ever add instagram + .exclude(saved_run__isnull=True) + .count() + == 0 + ] + map_parallel(_subscribe_to_page, fb_pages) integrations = BotIntegration.objects.reset_fb_pages_for_user( request.user.uid, fb_pages ) - page_names = ", ".join(map(str, integrations)) - return return_to_app_url(integrations, on_completion) or HTMLResponse( - f"Sucessfully Connected to {page_names}! You may now close this page." + redirect = None + for integration in integrations: + redirect = connect(integration, current_run, published_run) + + return ( + RedirectResponse(url=redirect.url, status_code=redirect.status_code) + if redirect + else HTMLResponse("No pages found!" + retry_button) ) @@ -221,19 +258,6 @@ def fb_webhook( return Response("OK") -def return_to_app_url( - integrations: list, - on_completion: str | None = None, -) -> bool | RedirectResponse: - if not on_completion or not integrations: - return False - return RedirectResponse( - furl(on_completion) - .add(query_params={"connect_ids": ",".join([str(i.id) for i in integrations])}) - .tostr() - ) - - wa_connect_redirect_url = ( furl( settings.APP_BASE_URL, @@ -242,7 +266,7 @@ def return_to_app_url( ).tostr() -def wa_connect_url(on_completion: str | None = None) -> str: +def wa_connect_url(current_run_id: str | None, published_run_id: str | None) -> str: return furl( "https://www.facebook.com/v18.0/dialog/oauth", query_params={ @@ -251,7 +275,9 @@ def wa_connect_url(on_completion: str | None = None) -> str: "redirect_uri": wa_connect_redirect_url, "response_type": "code", "config_id": settings.FB_WHATSAPP_CONFIG_ID, - "state": on_completion, + "state": json.dumps( + dict(current_run_id=current_run_id, published_run_id=published_run_id) + ), }, ).tostr() @@ -264,7 +290,7 @@ def wa_connect_url(on_completion: str | None = None) -> str: ).tostr() -def fb_connect_url(on_completion: str | None = None) -> str: +def fb_connect_url(current_run_id: str | None, published_run_id: str | None) -> str: return furl( "https://www.facebook.com/dialog/oauth", query_params={ @@ -279,12 +305,14 @@ def fb_connect_url(on_completion: str | None = None) -> str: "pages_show_list", ] ), - "state": on_completion, + "state": json.dumps( + dict(current_run_id=current_run_id, published_run_id=published_run_id) + ), }, ).tostr() -def ig_connect_url(on_completion: str | None = None) -> str: +def ig_connect_url(current_run_id: str | None, published_run_id: str | None) -> str: return furl( "https://www.facebook.com/dialog/oauth", query_params={ @@ -302,7 +330,9 @@ def ig_connect_url(on_completion: str | None = None) -> str: "pages_show_list", ] ), - "state": on_completion, + "state": json.dumps( + dict(current_run_id=current_run_id, published_run_id=published_run_id) + ), }, ).tostr() diff --git a/routers/slack_api.py b/routers/slack_api.py index 5e7176ed9..f5308d4ea 100644 --- a/routers/slack_api.py +++ b/routers/slack_api.py @@ -10,7 +10,14 @@ from starlette.background import BackgroundTasks from starlette.responses import RedirectResponse, HTMLResponse -from bots.models import BotIntegration, Platform, Conversation, Message +from bots.models import ( + BotIntegration, + Platform, + Conversation, + Message, + SavedRun, + PublishedRun, +) from bots.tasks import create_personal_channels_for_all_members from daras_ai_v2 import settings from daras_ai_v2.bots import msg_handler @@ -27,12 +34,12 @@ fetch_user_info, parse_slack_response, ) -from routers.facebook_api import return_to_app_url +from recipes.VideoBots import connect router = APIRouter() -def slack_connect_url(on_completion: str | None = None): +def slack_connect_url(current_run_id: str | None, published_run_id: str | None): return furl( "https://slack.com/oauth/v2/authorize", query_params=dict( @@ -65,7 +72,9 @@ def slack_connect_url(on_completion: str | None = None): "groups:write.invites", ] ), - state=on_completion, + state=json.dumps( + dict(current_run_id=current_run_id, published_run_id=published_run_id) + ), ), ) @@ -76,8 +85,16 @@ def slack_connect_redirect(request: Request): redirect_url = furl("/login", query_params={"next": request.url}) return RedirectResponse(str(redirect_url)) - on_completion = request.query_params.get("state") - retry_button = f'Retry' + connection_state = json.loads(request.query_params.get("state", "{}")) + current_run_id = connection_state.get("current_run_id", None) + published_run_id = connection_state.get("published_run_id", None) + retry_button = ( + f'Retry' + ) + current_run = SavedRun.objects.get(run_id=current_run_id) + published_run = PublishedRun.objects.filter( + published_run_id=published_run_id + ).first() code = request.query_params.get("code") if not code: @@ -140,12 +157,12 @@ def slack_connect_redirect(request: Request): BotIntegration.objects.filter(pk=bi.pk).update(**config) bi.refresh_from_db() + redirect = connect(bi, current_run, published_run) + if bi.slack_create_personal_channels: create_personal_channels_for_all_members.delay(bi.id) - return return_to_app_url([bi], on_completion) or HTMLResponse( - f"Sucessfully Connected to {slack_team_name} workspace on #{slack_channel_name}! You may now close this page." - ) + return RedirectResponse(url=redirect.url, status_code=redirect.status_code) @router.post("/__/slack/interaction/")