diff --git a/frontend/public/services/gleap.png b/frontend/public/services/gleap.png new file mode 100644 index 0000000000000..99eb242c9dc91 Binary files /dev/null and b/frontend/public/services/gleap.png differ diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 54ccdd6a2189b..8d68e16f4bb54 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -2224,7 +2224,7 @@ const api = { async get(id: IntegrationType['id']): Promise { return await new ApiRequest().integration(id).get() }, - async create(data: Partial): Promise { + async create(data: Partial | FormData): Promise { return await new ApiRequest().integrations().create({ data }) }, async delete(integrationId: IntegrationType['id']): Promise { diff --git a/frontend/src/lib/integrations/integrationsLogic.ts b/frontend/src/lib/integrations/integrationsLogic.ts index 63e826bbff90b..78628e3e82716 100644 --- a/frontend/src/lib/integrations/integrationsLogic.ts +++ b/frontend/src/lib/integrations/integrationsLogic.ts @@ -4,6 +4,7 @@ import { loaders } from 'kea-loaders' import { router, urlToAction } from 'kea-router' import api from 'lib/api' import { fromParamsGivenUrl } from 'lib/utils' +import IconGoogleCloud from 'public/services/google-cloud.png' import IconHubspot from 'public/services/hubspot.png' import IconSalesforce from 'public/services/salesforce.png' import IconSlack from 'public/services/slack.png' @@ -18,6 +19,7 @@ const ICONS: Record = { slack: IconSlack, salesforce: IconSalesforce, hubspot: IconHubspot, + 'google-pubsub': IconGoogleCloud, } export const integrationsLogic = kea([ @@ -28,10 +30,15 @@ export const integrationsLogic = kea([ actions({ handleOauthCallback: (kind: IntegrationKind, searchParams: any) => ({ kind, searchParams }), + newGoogleCloudKey: (kind: string, key: File, callback?: (integration: IntegrationType) => void) => ({ + kind, + key, + callback, + }), deleteIntegration: (id: number) => ({ id }), }), - loaders(() => ({ + loaders(({ values }) => ({ integrations: [ null as IntegrationType[] | null, { @@ -48,6 +55,34 @@ export const integrationsLogic = kea([ } }) }, + newGoogleCloudKey: async ({ kind, key, callback }) => { + try { + const formData = new FormData() + formData.append('kind', kind) + formData.append('key', key) + const response = await api.integrations.create(formData) + const responseWithIcon = { ...response, icon_url: ICONS[kind] ?? ICONS['google-pubsub'] } + + // run onChange after updating the integrations loader + window.setTimeout(() => callback?.(responseWithIcon), 0) + + if ( + values.integrations?.find( + (x) => x.kind === kind && x.display_name === response.display_name + ) + ) { + lemonToast.success('Google Cloud key updated.') + return values.integrations.map((x) => + x.kind === kind && x.display_name === response.display_name ? responseWithIcon : x + ) + } + lemonToast.success('Google Cloud key created.') + return [...(values.integrations ?? []), responseWithIcon] + } catch (e) { + lemonToast.error('Failed to upload Google Cloud key.') + throw e + } + }, }, ], })), diff --git a/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx b/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx index af8d9c66bb128..1376f1897ebc9 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx @@ -1,6 +1,6 @@ import { IconExternal, IconX } from '@posthog/icons' import { LemonButton, LemonMenu, LemonSkeleton } from '@posthog/lemon-ui' -import { useValues } from 'kea' +import { useActions, useValues } from 'kea' import api from 'lib/api' import { integrationsLogic } from 'lib/integrations/integrationsLogic' import { IntegrationView } from 'lib/integrations/IntegrationView' @@ -21,6 +21,7 @@ export function IntegrationChoice({ redirectUrl, }: IntegrationConfigureProps): JSX.Element | null { const { integrationsLoading, integrations } = useValues(integrationsLogic) + const { newGoogleCloudKey } = useActions(integrationsLogic) const kind = integration const integrationsOfKind = integrations?.filter((x) => x.kind === kind) const integrationKind = integrationsOfKind?.find((integration) => integration.id === value) @@ -33,6 +34,22 @@ export function IntegrationChoice({ return } + const kindName = kind == 'google-pubsub' ? 'Google Cloud Pub/Sub' : capitalizeFirstLetter(kind) + + function uploadKey(kind: string): void { + const input = document.createElement('input') + input.type = 'file' + input.accept = '.json' + input.onchange = async (e) => { + const file = (e.target as HTMLInputElement).files?.[0] + if (!file) { + return + } + newGoogleCloudKey(kind, file, (integration) => onChange?.(integration.id)) + } + input.click() + } + const button = ( uploadKey(kind), + label: 'Upload Google Cloud .json key file', + }, + ], + } + : { + items: [ + { + to: api.integrations.authorizeUrl({ + kind, + next: redirectUrl, + }), + disableClientSideRouting: true, + label: integrationsOfKind?.length + ? `Connect to a different ${kind} integration` + : `Connect to ${kind}`, + }, + ], + }, { items: [ { @@ -83,7 +109,7 @@ export function IntegrationChoice({ {integrationKind ? ( Change ) : ( - Choose {capitalizeFirstLetter(kind)} connection + Choose {kindName} connection )} ) diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 624012468fe9e..1000b66e54f06 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3571,7 +3571,7 @@ export enum EventDefinitionType { EventPostHog = 'event_posthog', } -export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot' +export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot' | 'google-pubsub' export interface IntegrationType { id: number diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 1b9ca11ec4e13..1f49620172660 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0467_add_web_vitals_allowed_metrics +posthog: 0468_integration_google_pubsub sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/api/dashboards/dashboard.py b/posthog/api/dashboards/dashboard.py index 136824cfdf8b5..d7bf59576b139 100644 --- a/posthog/api/dashboards/dashboard.py +++ b/posthog/api/dashboards/dashboard.py @@ -513,7 +513,10 @@ def move_tile(self, request: Request, *args: Any, **kwargs: Any) -> Response: parser_classes=[DashboardTemplateCreationJSONSchemaParser], ) def create_from_template_json(self, request: Request, *args: Any, **kwargs: Any) -> Response: - dashboard = Dashboard.objects.create(team_id=self.team_id) + dashboard = Dashboard.objects.create( + team_id=self.team_id, + created_by=cast(User, request.user), + ) try: dashboard_template = DashboardTemplate(**request.data["template"]) diff --git a/posthog/api/decide.py b/posthog/api/decide.py index c5bdd0205afe4..a89f2661ce0b2 100644 --- a/posthog/api/decide.py +++ b/posthog/api/decide.py @@ -140,6 +140,17 @@ def get_decide(request: HttpRequest): team = user.teams.get(id=project_id) if team: + if team.id in settings.DECIDE_SHORT_CIRCUITED_TEAM_IDS: + return cors_response( + request, + generate_exception_response( + "decide", + f"Team with ID {team.id} cannot access the /decide endpoint." + f"Please contact us at hey@posthog.com", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + ), + ) + token = cast(str, token) # we know it's not None if we found a team structlog.contextvars.bind_contextvars(team_id=team.id) diff --git a/posthog/api/integration.py b/posthog/api/integration.py index 6c343633ff7da..8b269de3b8280 100644 --- a/posthog/api/integration.py +++ b/posthog/api/integration.py @@ -1,3 +1,5 @@ +import json + from typing import Any from django.http import HttpResponse @@ -10,7 +12,7 @@ from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.shared import UserBasicSerializer -from posthog.models.integration import Integration, OauthIntegration, SlackIntegration +from posthog.models.integration import Integration, OauthIntegration, SlackIntegration, GoogleCloudIntegration class IntegrationSerializer(serializers.ModelSerializer): @@ -27,7 +29,17 @@ def create(self, validated_data: Any) -> Any: request = self.context["request"] team_id = self.context["team_id"] - if validated_data["kind"] in OauthIntegration.supported_kinds: + if validated_data["kind"] in GoogleCloudIntegration.supported_kinds: + key_file = request.FILES.get("key") + if not key_file: + raise ValidationError("Key file not provided") + key_info = json.loads(key_file.read().decode("utf-8")) + instance = GoogleCloudIntegration.integration_from_key( + validated_data["kind"], key_info, team_id, request.user + ) + return instance + + elif validated_data["kind"] in OauthIntegration.supported_kinds: try: instance = OauthIntegration.integration_from_oauth_response( validated_data["kind"], team_id, request.user, validated_data["config"] diff --git a/posthog/api/plugin.py b/posthog/api/plugin.py index bb19663d88589..930fc1a6c67fe 100644 --- a/posthog/api/plugin.py +++ b/posthog/api/plugin.py @@ -914,6 +914,8 @@ def migrate(self, request: request.Request, **kwargs): raise ValidationError("No migration available for this plugin") hog_function_data = migrater.migrate(obj) + hog_function_data["template_id"] = hog_function_data["id"] + del hog_function_data["id"] if obj.enabled: hog_function_data["enabled"] = True diff --git a/posthog/api/test/dashboards/test_dashboard.py b/posthog/api/test/dashboards/test_dashboard.py index a23edc4ccc9c8..359cff26345af 100644 --- a/posthog/api/test/dashboards/test_dashboard.py +++ b/posthog/api/test/dashboards/test_dashboard.py @@ -1162,6 +1162,9 @@ def test_create_from_template_json(self, mock_capture) -> None: self.assertEqual(dashboard["name"], valid_template["template_name"], dashboard) self.assertEqual(dashboard["description"], valid_template["dashboard_description"]) + self.assertEqual( + dashboard["created_by"], dashboard["created_by"] | {"first_name": "", "email": "user1@posthog.com"} + ) self.assertEqual(len(dashboard["tiles"]), 1) diff --git a/posthog/api/test/test_decide.py b/posthog/api/test/test_decide.py index 4d680c27e0e56..2bd47011c93c8 100644 --- a/posthog/api/test/test_decide.py +++ b/posthog/api/test/test_decide.py @@ -29,6 +29,7 @@ Plugin, PluginConfig, PluginSourceFile, + Project, ) from posthog.models.cohort.cohort import Cohort from posthog.models.feature_flag.feature_flag import FeatureFlagHashKeyOverride @@ -2647,6 +2648,40 @@ def test_missing_token(self, *args): response = self._post_decide({"distinct_id": "example_id", "api_key": None, "project_id": self.team.id}) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + def test_short_circuited_team(self, *args): + short_circuited_team_token = "short_circuited_team_token" + + _, short_circuited_team = Project.objects.create_with_team( + organization=self.organization, + team_fields={ + "api_token": short_circuited_team_token, + "test_account_filters": [ + { + "key": "email", + "value": "@posthog.com", + "operator": "not_icontains", + "type": "person", + } + ], + "has_completed_onboarding_for": {"product_analytics": True}, + }, + initiating_user=self.user, + ) + with self.settings(DECIDE_SHORT_CIRCUITED_TEAM_IDS=[short_circuited_team.id]): + response = self._post_decide( + { + "distinct_id": "example_id", + "api_key": short_circuited_team_token, + "project_id": short_circuited_team.id, + } + ) + self.assertEqual(response.status_code, status.HTTP_422_UNPROCESSABLE_ENTITY) + response_data = response.json() + self.assertEqual( + response_data["detail"], + f"Team with ID {short_circuited_team.id} cannot access the /decide endpoint.Please contact us at hey@posthog.com", + ) + def test_invalid_payload_on_decide_endpoint(self, *args): invalid_payloads = [ base64.b64encode(b"1-1").decode("utf-8"), diff --git a/posthog/cdp/templates/__init__.py b/posthog/cdp/templates/__init__.py index 4edf9ca937ca8..aa26040e42da6 100644 --- a/posthog/cdp/templates/__init__.py +++ b/posthog/cdp/templates/__init__.py @@ -17,6 +17,8 @@ from .avo.template_avo import template as avo from .loops.template_loops import template as loops from .rudderstack.template_rudderstack import template as rudderstack +from .gleap.template_gleap import template as gleap +from .google_pubsub.template_google_pubsub import template as google_pubsub, TemplateGooglePubSubMigrator HOG_FUNCTION_TEMPLATES = [ @@ -38,6 +40,8 @@ loops, rudderstack, avo, + gleap, + google_pubsub, ] @@ -47,6 +51,7 @@ TemplateCustomerioMigrator.plugin_url: TemplateCustomerioMigrator, TemplateIntercomMigrator.plugin_url: TemplateIntercomMigrator, TemplateSendGridMigrator.plugin_url: TemplateSendGridMigrator, + TemplateGooglePubSubMigrator.plugin_url: TemplateGooglePubSubMigrator, } __all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID"] diff --git a/posthog/cdp/templates/gleap/template_gleap.py b/posthog/cdp/templates/gleap/template_gleap.py new file mode 100644 index 0000000000000..a177ce42ce90c --- /dev/null +++ b/posthog/cdp/templates/gleap/template_gleap.py @@ -0,0 +1,91 @@ +from posthog.cdp.templates.hog_function_template import HogFunctionTemplate + +template: HogFunctionTemplate = HogFunctionTemplate( + status="beta", + id="template-gleap", + name="Add contacts to Gleap", + description="Updates a contact in Gleap", + icon_url="/static/services/gleap.png", + hog=""" +let action := inputs.action +let name := event.name + +if (empty(inputs.userId)) { + print('No User ID set. Skipping...') + return +} + +let attributes := inputs.include_all_properties ? person.properties : {} + +attributes['userId'] := inputs.userId + +for (let key, value in inputs.attributes) { + if (not empty(value)) { + attributes[key] := value + } +} + +let res := fetch(f'https://api.gleap.io/admin/identify', { + 'method': 'POST', + 'headers': { + 'User-Agent': 'PostHog Gleap.io App', + 'Api-Token': inputs.apiKey, + 'Content-Type': 'application/json' + }, + 'body': attributes +}) + +if (res.status >= 400) { + print('Error from gleap.io api:', res.status, res.body) +} + +""".strip(), + inputs_schema=[ + { + "key": "apiKey", + "type": "string", + "label": "Gleap.io API Key", + "secret": True, + "required": True, + }, + { + "key": "userId", + "type": "string", + "label": "User ID", + "description": "You can choose to fill this from an `email` property or an `id` property. If the value is empty nothing will be sent. See here for more information: https://docs.gleap.io/server/rest-api", + "default": "{person.uuid}", + "secret": False, + "required": True, + }, + { + "key": "include_all_properties", + "type": "boolean", + "label": "Include all properties as attributes", + "description": "If set, all person properties will be included as attributes. Individual attributes can be overridden below.", + "default": False, + "secret": False, + "required": True, + }, + { + "key": "attributes", + "type": "dictionary", + "label": "Attribute mapping", + "description": "Map of Gleap.io attributes and their values. You can use the filters section to filter out unwanted events.", + "default": { + "email": "{person.properties.email}", + "name": "{person.properties.name}", + "phone": "{person.properties.phone}", + }, + "secret": False, + "required": False, + }, + ], + filters={ + "events": [ + {"id": "$identify", "name": "$identify", "type": "events", "order": 0}, + {"id": "$set", "name": "$set", "type": "events", "order": 1}, + ], + "actions": [], + "filter_test_accounts": True, + }, +) diff --git a/posthog/cdp/templates/gleap/test_template_gleap.py b/posthog/cdp/templates/gleap/test_template_gleap.py new file mode 100644 index 0000000000000..85769fad13fc1 --- /dev/null +++ b/posthog/cdp/templates/gleap/test_template_gleap.py @@ -0,0 +1,82 @@ +from inline_snapshot import snapshot +from posthog.cdp.templates.helpers import BaseHogFunctionTemplateTest +from posthog.cdp.templates.gleap.template_gleap import ( + template as template_gleap, +) + + +def create_inputs(**kwargs): + inputs = { + "apiKey": "uB6Jymn60NN5EEIWgiUzZx13geVlEx26", + "include_all_properties": False, + "userId": "edad9282-25d0-4cf1-af0e-415535ee1161", + "attributes": {"name": "example", "email": "example@posthog.com"}, + } + inputs.update(kwargs) + + return inputs + + +class TestTemplateGleap(BaseHogFunctionTemplateTest): + template = template_gleap + + def test_function_works(self): + self.run_function( + inputs=create_inputs(), + globals={ + "event": {"name": "$identify"}, + }, + ) + + assert self.get_mock_fetch_calls()[0] == snapshot( + ( + "https://api.gleap.io/admin/identify", + { + "method": "POST", + "headers": { + "User-Agent": "PostHog Gleap.io App", + "Api-Token": "uB6Jymn60NN5EEIWgiUzZx13geVlEx26", + "Content-Type": "application/json", + }, + "body": { + "userId": "edad9282-25d0-4cf1-af0e-415535ee1161", + "name": "example", + "email": "example@posthog.com", + }, + }, + ) + ) + + def test_body_includes_all_properties_if_set(self): + self.run_function( + inputs=create_inputs(include_all_properties=False), + globals={ + "person": {"properties": {"account_status": "paid"}}, + }, + ) + + assert self.get_mock_fetch_calls()[0][1]["body"] == snapshot( + {"userId": "edad9282-25d0-4cf1-af0e-415535ee1161", "name": "example", "email": "example@posthog.com"} + ) + + self.run_function( + inputs=create_inputs(include_all_properties=True), + globals={ + "person": {"properties": {"account_status": "paid"}}, + }, + ) + + assert self.get_mock_fetch_calls()[0][1]["body"] == snapshot( + { + "userId": "edad9282-25d0-4cf1-af0e-415535ee1161", + "account_status": "paid", + "name": "example", + "email": "example@posthog.com", + } + ) + + def test_function_requires_identifier(self): + self.run_function(inputs=create_inputs(userId="")) + + assert not self.get_mock_fetch_calls() + assert self.get_mock_print_calls() == snapshot([("No User ID set. Skipping...",)]) diff --git a/posthog/cdp/templates/google_pubsub/template_google_pubsub.py b/posthog/cdp/templates/google_pubsub/template_google_pubsub.py new file mode 100644 index 0000000000000..ceb853c752c5a --- /dev/null +++ b/posthog/cdp/templates/google_pubsub/template_google_pubsub.py @@ -0,0 +1,126 @@ +import dataclasses +import json +from copy import deepcopy + +from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionTemplateMigrator +from posthog.models.integration import GoogleCloudIntegration + +template: HogFunctionTemplate = HogFunctionTemplate( + status="beta", + id="template-google-pubsub", + name="Google Pub/Sub", + description="Send data to a Google Pub/Sub topic", + icon_url="/static/services/google-cloud.png", + hog=""" +let headers := () -> { + 'Authorization': f'Bearer {inputs.auth.access_token}', + 'Content-Type': 'application/json' +} +let message := () -> { + 'messageId': event.uuid, + 'data': base64Encode(jsonStringify(inputs.payload)), + 'attributes': inputs.attributes +} +let res := fetch(f'https://pubsub.googleapis.com/v1/{inputs.topicId}:publish', { + 'method': 'POST', + 'headers': headers(), + 'body': jsonStringify({ 'messages': [message()] }) +}) + +if (res.status >= 200 and res.status < 300) { + print('Event sent successfully!') +} else { + throw Error('Error sending event', res) +} +""".strip(), + inputs_schema=[ + { + "key": "auth", + "type": "integration", + "integration": "google-pubsub", + "label": "Google Cloud service account", + "secret": False, + "required": True, + }, + { + "key": "topicId", + "type": "string", + "label": "Topic name", + "secret": False, + "required": True, + }, + { + "key": "payload", + "type": "json", + "label": "Message Payload", + "default": {"event": "{event}", "person": "{person}"}, + "secret": False, + "required": False, + }, + { + "key": "attributes", + "type": "json", + "label": "Attributes", + "default": {}, + "secret": False, + "required": False, + }, + ], +) + + +class TemplateGooglePubSubMigrator(HogFunctionTemplateMigrator): + plugin_url = "https://github.com/PostHog/pubsub-plugin" + + @classmethod + def migrate(cls, obj): + hf = deepcopy(dataclasses.asdict(template)) + + exportEventsToIgnore = obj.config.get("exportEventsToIgnore", "") + topicId = obj.config.get("topicId", "") + + from posthog.models.plugin import PluginAttachment + + attachment: PluginAttachment | None = PluginAttachment.objects.filter( + plugin_config=obj, key="googleCloudKeyJson" + ).first() + if not attachment: + raise Exception("Google Cloud Key JSON not found") + + keyFile = json.loads(attachment.contents.decode("UTF-8")) # type: ignore + integration = GoogleCloudIntegration.integration_from_key("google-pubsub", keyFile, obj.team.pk) + + hf["filters"] = {} + if exportEventsToIgnore: + events = exportEventsToIgnore.split(",") + if len(events) > 0: + event_names = ", ".join(["'{}'".format(event.strip()) for event in events]) + query = f"event not in ({event_names})" + hf["filters"]["events"] = [ + { + "id": None, + "name": "All events", + "type": "events", + "order": 0, + "properties": [{"key": query, "type": "hogql"}], + } + ] + + hf["inputs"] = { + "topicId": {"value": topicId}, + "payload": { + "value": { + "event": "{event.name}", + "distinct_id": "{event.distinct_id}", + "timestamp": "{event.timestamp}", + "uuid": "{event.uuid}", + "properties": "{event.properties}", + "person_id": "{person.id}", + "person_properties": "{person.properties}", + } + }, + "auth": {"value": integration.id}, + "attributes": {"value": {}}, + } + + return hf diff --git a/posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py b/posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py new file mode 100644 index 0000000000000..077cdeae5d348 --- /dev/null +++ b/posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py @@ -0,0 +1,204 @@ +from datetime import datetime +from unittest.mock import patch + +from inline_snapshot import snapshot + +from posthog.cdp.templates.google_pubsub.template_google_pubsub import TemplateGooglePubSubMigrator +from posthog.cdp.templates.helpers import BaseHogFunctionTemplateTest +from posthog.cdp.templates.hubspot.template_hubspot import template as template_hubspot +from posthog.models import PluginConfig, PluginAttachment, Plugin, Integration +from posthog.test.base import BaseTest + + +class TestTemplateGooglePubSub(BaseHogFunctionTemplateTest): + template = template_hubspot + + def _inputs(self, **kwargs): + inputs = { + "oauth": {"access_token": "TOKEN"}, + "email": "example@posthog.com", + "properties": { + "company": "PostHog", + }, + } + inputs.update(kwargs) + return inputs + + def test_function_works(self): + self.mock_fetch_response = lambda *args: {"status": 200, "body": {"status": "success"}} # type: ignore + + res = self.run_function(inputs=self._inputs()) + + assert res.result is None + + assert self.get_mock_fetch_calls() == [ + ( + "https://api.hubapi.com/crm/v3/objects/contacts", + { + "method": "POST", + "headers": {"Authorization": "Bearer TOKEN", "Content-Type": "application/json"}, + "body": {"properties": {"company": "PostHog", "email": "example@posthog.com"}}, + }, + ) + ] + assert self.get_mock_print_calls() == [("Contact created successfully!",)] + + def test_exits_if_no_email(self): + for email in [None, ""]: + self.mock_print.reset_mock() + res = self.run_function(inputs=self._inputs(email=email)) + + assert res.result is None + assert self.get_mock_fetch_calls() == [] + assert self.get_mock_print_calls() == [("`email` input is empty. Not creating a contact.",)] + + def test_handles_updates(self): + call_count = 0 + + # First call respond with 409, second one 200 and increment call_count + def mock_fetch(*args): + nonlocal call_count + call_count += 1 + return ( + {"status": 409, "body": {"message": "Contact already exists. Existing ID: 12345"}} + if call_count == 1 + else {"status": 200, "body": {"status": "success"}} + ) + + self.mock_fetch_response = mock_fetch # type: ignore + + res = self.run_function(inputs=self._inputs()) + + assert res.result is None + + assert len(self.get_mock_fetch_calls()) == 2 + + assert self.get_mock_fetch_calls()[0] == ( + "https://api.hubapi.com/crm/v3/objects/contacts", + { + "method": "POST", + "headers": {"Authorization": "Bearer TOKEN", "Content-Type": "application/json"}, + "body": {"properties": {"company": "PostHog", "email": "example@posthog.com"}}, + }, + ) + + assert self.get_mock_fetch_calls()[1] == ( + "https://api.hubapi.com/crm/v3/objects/contacts/12345", + { + "method": "PATCH", + "headers": {"Authorization": "Bearer TOKEN", "Content-Type": "application/json"}, + "body": {"properties": {"company": "PostHog", "email": "example@posthog.com"}}, + }, + ) + + +class TestTemplateMigration(BaseTest): + def get_plugin_config(self, config: dict): + _config = { + "topicId": "TOPIC_ID", + "exportEventsToIgnore": "", + } + _config.update(config) + return PluginConfig(enabled=True, order=0, config=_config) + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_integration(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + plugin = Plugin() + plugin.save() + obj = self.get_plugin_config({}) + obj.plugin = plugin + obj.team = self.team + obj.save() + PluginAttachment.objects.create( + plugin_config=obj, contents=b'{"cloud": "key"}', key="googleCloudKeyJson", file_size=10 + ) + + template = TemplateGooglePubSubMigrator.migrate(obj) + template["inputs"]["auth"]["value"] = 1 # mock the ID + assert template["inputs"] == snapshot( + { + "auth": {"value": 1}, + "topicId": {"value": "TOPIC_ID"}, + "payload": { + "value": { + "event": "{event.name}", + "distinct_id": "{event.distinct_id}", + "timestamp": "{event.timestamp}", + "uuid": "{event.uuid}", + "properties": "{event.properties}", + "person_properties": "{person.properties}", + "person_id": "{person.id}", + } + }, + "attributes": {"value": {}}, + } + ) + assert template["filters"] == {} + + integration = Integration.objects.last() + assert integration is not None + assert integration.kind == "google-pubsub" + assert integration.sensitive_config == {"cloud": "key"} + assert integration.config.get("access_token") == "ACCESS_TOKEN" + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_ignore_events(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + plugin = Plugin() + plugin.save() + obj = self.get_plugin_config( + { + "exportEventsToIgnore": "event1, event2", + } + ) + obj.plugin = plugin + obj.team = self.team + obj.save() + PluginAttachment.objects.create( + plugin_config=obj, contents=b'{"cloud": "key"}', key="googleCloudKeyJson", file_size=10 + ) + + template = TemplateGooglePubSubMigrator.migrate(obj) + template["inputs"]["auth"]["value"] = 1 # mock the ID + assert template["inputs"] == snapshot( + { + "auth": {"value": 1}, + "topicId": {"value": "TOPIC_ID"}, + "payload": { + "value": { + "event": "{event.name}", + "distinct_id": "{event.distinct_id}", + "timestamp": "{event.timestamp}", + "uuid": "{event.uuid}", + "properties": "{event.properties}", + "person_id": "{person.id}", + "person_properties": "{person.properties}", + } + }, + "attributes": {"value": {}}, + } + ) + assert template["filters"] == snapshot( + { + "events": [ + { + "id": None, + "name": "All events", + "type": "events", + "order": 0, + "properties": [{"key": "event not in ('event1', 'event2')", "type": "hogql"}], + } + ] + } + ) diff --git a/posthog/hogql/database/schema/sessions_v1.py b/posthog/hogql/database/schema/sessions_v1.py index c95076f603fe7..b4bc82724611e 100644 --- a/posthog/hogql/database/schema/sessions_v1.py +++ b/posthog/hogql/database/schema/sessions_v1.py @@ -87,6 +87,9 @@ name="duration" ), # alias of $session_duration, deprecated but included for backwards compatibility "$is_bounce": BooleanDatabaseField(name="$is_bounce"), + # some aliases for people reverting from v2 to v1 + "$end_current_url": StringDatabaseField(name="$end_current_url"), + "$end_pathname": StringDatabaseField(name="$end_pathname"), } @@ -240,6 +243,10 @@ def arg_max_merge_field(field_name: str) -> ast.Call: gad_source=aggregate_fields["$entry_gad_source"], ) + # aliases for people reverting from v2 to v1 + aggregate_fields["$end_current_url"] = aggregate_fields["$exit_current_url"] + aggregate_fields["$end_pathname"] = aggregate_fields["$exit_pathname"] + select_fields: list[ast.Expr] = [] group_by_fields: list[ast.Expr] = [ast.Field(chain=[table_name, "session_id"])] @@ -282,6 +289,9 @@ def to_printed_hogql(self): def avoid_asterisk_fields(self) -> list[str]: return [ "duration", # alias of $session_duration, deprecated but included for backwards compatibility + # aliases for people reverting from v2 to v1 + "$end_current_url", + "$end_pathname", ] @@ -318,6 +328,9 @@ def get_lazy_session_table_properties_v1(search: Optional[str]): "$urls", "duration", "$num_uniq_urls", + # aliases for people reverting from v2 to v1 + "$end_current_url", + "$end_pathname", } # some fields should have a specific property type which isn't derivable from the type of database field diff --git a/posthog/hogql/database/schema/sessions_v2.py b/posthog/hogql/database/schema/sessions_v2.py index 6f3ce13974811..04497072e08a2 100644 --- a/posthog/hogql/database/schema/sessions_v2.py +++ b/posthog/hogql/database/schema/sessions_v2.py @@ -93,6 +93,9 @@ "$is_bounce": BooleanDatabaseField(name="$is_bounce"), "$last_external_click_url": StringDatabaseField(name="$last_external_click_url"), "$page_screen_autocapture_count_up_to": DatabaseField(name="$$page_screen_autocapture_count_up_to"), + # some aliases for people upgrading from v1 to v2 + "$exit_current_url": StringDatabaseField(name="$exit_current_url"), + "$exit_pathname": StringDatabaseField(name="$exit_pathname"), } @@ -306,6 +309,9 @@ def arg_max_merge_field(field_name: str) -> ast.Call: gclid=aggregate_fields["$entry_gclid"], gad_source=aggregate_fields["$entry_gad_source"], ) + # some aliases for people upgrading from v1 to v2 + aggregate_fields["$exit_current_url"] = aggregate_fields["$end_current_url"] + aggregate_fields["$exit_pathname"] = aggregate_fields["$end_pathname"] select_fields: list[ast.Expr] = [] group_by_fields: list[ast.Expr] = [ast.Field(chain=[table_name, "session_id_v7"])] @@ -351,6 +357,9 @@ def avoid_asterisk_fields(self) -> list[str]: "session_id_v7", # HogQL insights currently don't support returning uint128s due to json serialisation "id", # prefer to use session_id "duration", # alias of $session_duration, deprecated but included for backwards compatibility + # aliases for people upgrading from v1 to v2 + "$exit_current_url", + "$exit_pathname", ] @@ -396,6 +405,9 @@ def get_lazy_session_table_properties_v2(search: Optional[str]): "duration", "$num_uniq_urls", "$page_screen_autocapture_count_up_to", + # aliases for people upgrading from v1 to v2 + "$exit_current_url", + "$exit_pathname", } # some fields should have a specific property type which isn't derivable from the type of database field diff --git a/posthog/hogql/database/schema/test/test_sessions_v1.py b/posthog/hogql/database/schema/test/test_sessions_v1.py index 507c5704978bb..eefd04197deab 100644 --- a/posthog/hogql/database/schema/test/test_sessions_v1.py +++ b/posthog/hogql/database/schema/test/test_sessions_v1.py @@ -286,6 +286,41 @@ def test_bounce_rate(self, bounceRatePageViewMode): response.results or [], ) + def test_can_use_v1_and_v2_fields(self): + session_id = "session_test_can_use_v1_and_v2_fields" + + _create_event( + event="$pageview", + team=self.team, + distinct_id="d1", + properties={ + "$current_url": "https://example.com/pathname", + "$pathname": "/pathname", + "$session_id": session_id, + }, + ) + + response = self.__execute( + parse_select( + """ + select + $session_duration, + duration, + $end_current_url, + $exit_current_url, + $end_pathname, + $exit_pathname + from sessions + where session_id = {session_id} + """, + placeholders={"session_id": ast.Constant(value=session_id)}, + ), + ) + + assert response.results == [ + (0, 0, "https://example.com/pathname", "https://example.com/pathname", "/pathname", "/pathname") + ] + class TestGetLazySessionProperties(ClickhouseTestMixin, APIBaseTest): def test_all(self): diff --git a/posthog/hogql/database/schema/test/test_sessions_v2.py b/posthog/hogql/database/schema/test/test_sessions_v2.py index 59964b1a478bf..b5c2d71249317 100644 --- a/posthog/hogql/database/schema/test/test_sessions_v2.py +++ b/posthog/hogql/database/schema/test/test_sessions_v2.py @@ -541,6 +541,41 @@ def test_last_external_click_url(self): [row1] = response.results or [] self.assertEqual(row1, ("https://example.com/2",)) + def test_can_use_v1_and_v2_fields(self): + session_id = str(uuid7()) + + _create_event( + event="$pageview", + team=self.team, + distinct_id="d1", + properties={ + "$current_url": "https://example.com/pathname", + "$pathname": "/pathname", + "$session_id": session_id, + }, + ) + + response = self.__execute( + parse_select( + """ + select + $session_duration, + duration, + $end_current_url, + $exit_current_url, + $end_pathname, + $exit_pathname + from sessions + where session_id = {session_id} + """, + placeholders={"session_id": ast.Constant(value=session_id)}, + ), + ) + + assert response.results == [ + (0, 0, "https://example.com/pathname", "https://example.com/pathname", "/pathname", "/pathname") + ] + class TestGetLazySessionProperties(ClickhouseTestMixin, APIBaseTest): def test_all(self): diff --git a/posthog/hogql/database/test/__snapshots__/test_database.ambr b/posthog/hogql/database/test/__snapshots__/test_database.ambr index 8345a36c4208c..a504cab7c1bea 100644 --- a/posthog/hogql/database/test/__snapshots__/test_database.ambr +++ b/posthog/hogql/database/test/__snapshots__/test_database.ambr @@ -401,7 +401,9 @@ "duration", "$is_bounce", "$last_external_click_url", - "$page_screen_autocapture_count_up_to" + "$page_screen_autocapture_count_up_to", + "$exit_current_url", + "$exit_pathname" ], "hogql_value": "session", "id": null, @@ -930,7 +932,9 @@ "duration", "$is_bounce", "$last_external_click_url", - "$page_screen_autocapture_count_up_to" + "$page_screen_autocapture_count_up_to", + "$exit_current_url", + "$exit_pathname" ], "hogql_value": "session", "id": null, @@ -1404,6 +1408,26 @@ "schema_valid": true, "table": null, "type": "string" + }, + "$exit_current_url": { + "chain": null, + "fields": null, + "hogql_value": "`$exit_current_url`", + "id": null, + "name": "$exit_current_url", + "schema_valid": true, + "table": null, + "type": "string" + }, + "$exit_pathname": { + "chain": null, + "fields": null, + "hogql_value": "`$exit_pathname`", + "id": null, + "name": "$exit_pathname", + "schema_valid": true, + "table": null, + "type": "string" } }, "id": "sessions", @@ -1913,7 +1937,9 @@ "duration", "$is_bounce", "$last_external_click_url", - "$page_screen_autocapture_count_up_to" + "$page_screen_autocapture_count_up_to", + "$exit_current_url", + "$exit_pathname" ], "hogql_value": "session", "id": null, @@ -2442,7 +2468,9 @@ "duration", "$is_bounce", "$last_external_click_url", - "$page_screen_autocapture_count_up_to" + "$page_screen_autocapture_count_up_to", + "$exit_current_url", + "$exit_pathname" ], "hogql_value": "session", "id": null, @@ -2916,6 +2944,26 @@ "schema_valid": true, "table": null, "type": "string" + }, + "$exit_current_url": { + "chain": null, + "fields": null, + "hogql_value": "`$exit_current_url`", + "id": null, + "name": "$exit_current_url", + "schema_valid": true, + "table": null, + "type": "string" + }, + "$exit_pathname": { + "chain": null, + "fields": null, + "hogql_value": "`$exit_pathname`", + "id": null, + "name": "$exit_pathname", + "schema_valid": true, + "table": null, + "type": "string" } }, "id": "sessions", diff --git a/posthog/migrations/0468_integration_google_pubsub.py b/posthog/migrations/0468_integration_google_pubsub.py new file mode 100644 index 0000000000000..1ace80ef4ab2d --- /dev/null +++ b/posthog/migrations/0468_integration_google_pubsub.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.15 on 2024-09-10 10:43 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0467_add_web_vitals_allowed_metrics"), + ] + + operations = [ + migrations.AlterField( + model_name="integration", + name="kind", + field=models.CharField( + choices=[ + ("slack", "Slack"), + ("salesforce", "Salesforce"), + ("hubspot", "Hubspot"), + ("google-pubsub", "Google Pubsub"), + ], + max_length=20, + ), + ), + ] diff --git a/posthog/models/integration.py b/posthog/models/integration.py index c613bd5aa70c8..2cbc24f30a430 100644 --- a/posthog/models/integration.py +++ b/posthog/models/integration.py @@ -8,8 +8,11 @@ from django.db import models import requests +from rest_framework.exceptions import ValidationError from rest_framework.request import Request from slack_sdk import WebClient +from google.oauth2 import service_account +from google.auth.transport.requests import Request as GoogleRequest from django.conf import settings from posthog.cache_utils import cache_for @@ -39,11 +42,12 @@ class IntegrationKind(models.TextChoices): SLACK = "slack" SALESFORCE = "salesforce" HUBSPOT = "hubspot" + GOOGLE_PUBSUB = "google-pubsub" team = models.ForeignKey("Team", on_delete=models.CASCADE) # The integration type identifier - kind = models.CharField(max_length=10, choices=IntegrationKind.choices) + kind = models.CharField(max_length=20, choices=IntegrationKind.choices) # The ID of the integration in the external system integration_id = models.TextField(null=True, blank=True) # Any config that COULD be passed to the frontend @@ -69,6 +73,8 @@ def display_name(self) -> str: if self.kind in OauthIntegration.supported_kinds: oauth_config = OauthIntegration.oauth_config_for_kind(self.kind) return dot_get(self.config, oauth_config.name_path, self.integration_id) + if self.kind in GoogleCloudIntegration.supported_kinds: + return self.integration_id or "unknown ID" return f"ID: {self.integration_id}" @@ -382,3 +388,81 @@ def slack_config(cls): ) return config + + +class GoogleCloudIntegration: + supported_kinds = ["google-pubsub"] + integration: Integration + + def __init__(self, integration: Integration) -> None: + self.integration = integration + + @classmethod + def integration_from_key( + cls, kind: str, key_info: dict, team_id: int, created_by: Optional[User] = None + ) -> Integration: + if kind == "google-pubsub": + scope = "https://www.googleapis.com/auth/pubsub" + else: + raise NotImplementedError(f"Google Cloud integration kind {kind} not implemented") + + try: + credentials = service_account.Credentials.from_service_account_info(key_info, scopes=[scope]) + credentials.refresh(GoogleRequest()) + except Exception: + raise ValidationError(f"Failed to authenticate with provided service account key") + + integration, created = Integration.objects.update_or_create( + team_id=team_id, + kind=kind, + integration_id=credentials.service_account_email, + defaults={ + "config": { + "expires_in": credentials.expiry.timestamp() - int(time.time()), + "refreshed_at": int(time.time()), + "access_token": credentials.token, + }, + "sensitive_config": key_info, + "created_by": created_by, + }, + ) + + if integration.errors: + integration.errors = "" + integration.save() + + return integration + + def access_token_expired(self, time_threshold: Optional[timedelta] = None) -> bool: + expires_in = self.integration.config.get("expires_in") + refreshed_at = self.integration.config.get("refreshed_at") + if not expires_in or not refreshed_at: + return False + + # To be really safe we refresh if its half way through the expiry + time_threshold = time_threshold or timedelta(seconds=expires_in / 2) + + return time.time() > refreshed_at + expires_in - time_threshold.total_seconds() + + def refresh_access_token(self): + """ + Refresh the access token for the integration if necessary + """ + credentials = service_account.Credentials.from_service_account_info( + self.integration.sensitive_config, scopes=["https://www.googleapis.com/auth/pubsub"] + ) + + try: + credentials.refresh(GoogleRequest()) + except Exception: + raise ValidationError(f"Failed to authenticate with provided service account key") + + self.integration.config = { + "expires_in": credentials.expiry.timestamp() - int(time.time()), + "refreshed_at": int(time.time()), + "access_token": credentials.token, + } + self.integration.save() + reload_integrations_on_workers(self.integration.team_id, [self.integration.id]) + + logger.info(f"Refreshed access token for {self}") diff --git a/posthog/models/test/test_integration_model.py b/posthog/models/test/test_integration_model.py index cad8b798df03e..d4af7badfea46 100644 --- a/posthog/models/test/test_integration_model.py +++ b/posthog/models/test/test_integration_model.py @@ -6,7 +6,7 @@ from freezegun import freeze_time import pytest from posthog.models.instance_setting import set_instance_setting -from posthog.models.integration import Integration, OauthIntegration, SlackIntegration +from posthog.models.integration import Integration, OauthIntegration, SlackIntegration, GoogleCloudIntegration from posthog.test.base import BaseTest @@ -231,3 +231,88 @@ def test_refresh_access_token_handles_errors(self, mock_post, mock_reload): assert integration.errors == "TOKEN_REFRESH_FAILED" mock_reload.assert_not_called() + + +class TestGoogleCloudIntegrationModel(BaseTest): + mock_keyfile = { + "type": "service_account", + "project_id": "posthog-616", + "private_key_id": "df3e129a722a865cc3539b4e69507bad", + "private_key": "-----BEGIN PRIVATE KEY-----\nTHISISTHEKEY==\n-----END PRIVATE KEY-----\n", + "client_email": "hog-pubsub-test@posthog-301601.iam.gserviceaccount.com", + "client_id": "11223344556677889900", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/not-a-topic%40posthog-616.iam.gserviceaccount.com", + "universe_domain": "googleapis.com", + } + + def create_integration( + self, kind: str, config: Optional[dict] = None, sensitive_config: Optional[dict] = None + ) -> Integration: + _config = {"refreshed_at": int(time.time()), "expires_in": 3600} + _sensitive_config = self.mock_keyfile + _config.update(config or {}) + _sensitive_config.update(sensitive_config or {}) + + return Integration.objects.create(team=self.team, kind=kind, config=_config, sensitive_config=_sensitive_config) + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_integration_from_key(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + with freeze_time("2024-01-01T12:00:00Z"): + integration = GoogleCloudIntegration.integration_from_key( + "google-pubsub", + self.mock_keyfile, + self.team.id, + self.user, + ) + + assert integration.team == self.team + assert integration.created_by == self.user + + assert integration.config == { + "access_token": "ACCESS_TOKEN", + "refreshed_at": 1704110400, + "expires_in": 3600, + } + assert integration.sensitive_config == self.mock_keyfile + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_integration_refresh_token(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + with freeze_time("2024-01-01T12:00:00Z"): + integration = GoogleCloudIntegration.integration_from_key( + "google-pubsub", + self.mock_keyfile, + self.team.id, + self.user, + ) + + with freeze_time("2024-01-01T12:00:00Z"): + assert GoogleCloudIntegration(integration).access_token_expired() is False + + with freeze_time("2024-01-01T14:00:00Z"): + assert GoogleCloudIntegration(integration).access_token_expired() is True + + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600 * 3) + + GoogleCloudIntegration(integration).refresh_access_token() + assert GoogleCloudIntegration(integration).access_token_expired() is False + + assert integration.config == { + "access_token": "ACCESS_TOKEN", + "refreshed_at": 1704110400 + 3600 * 2, + "expires_in": 3600, + } diff --git a/posthog/settings/web.py b/posthog/settings/web.py index cad2cd9d22940..0aa9c1711c135 100644 --- a/posthog/settings/web.py +++ b/posthog/settings/web.py @@ -24,6 +24,11 @@ DECIDE_BUCKET_CAPACITY = get_from_env("DECIDE_BUCKET_CAPACITY", type_cast=int, default=500) DECIDE_BUCKET_REPLENISH_RATE = get_from_env("DECIDE_BUCKET_REPLENISH_RATE", type_cast=float, default=10.0) +# Prevent decide abuse + +# This is a list of team-ids that are prevented from using the /decide endpoint +# until they fix an issue with their feature flags causing instability in posthog. +DECIDE_SHORT_CIRCUITED_TEAM_IDS = [15611] # Decide db settings DECIDE_SKIP_POSTGRES_FLAGS = get_from_env("DECIDE_SKIP_POSTGRES_FLAGS", False, type_cast=str_to_bool) diff --git a/posthog/tasks/integrations.py b/posthog/tasks/integrations.py index e0f543874bfe8..1ebef062cf0cd 100644 --- a/posthog/tasks/integrations.py +++ b/posthog/tasks/integrations.py @@ -1,5 +1,6 @@ from celery import shared_task +from posthog.models.integration import GoogleCloudIntegration from posthog.tasks.utils import CeleryQueue @@ -15,6 +16,14 @@ def refresh_integrations() -> int: if oauth_integration.access_token_expired(): refresh_integration.delay(integration.id) + gcloud_integrations = Integration.objects.filter(kind__in=GoogleCloudIntegration.supported_kinds).all() + + for integration in gcloud_integrations: + gcloud_integration = GoogleCloudIntegration(integration) + + if gcloud_integration.access_token_expired(): + refresh_integration.delay(integration.id) + return 0 @@ -27,5 +36,8 @@ def refresh_integration(id: int) -> int: if integration.kind in OauthIntegration.supported_kinds: oauth_integration = OauthIntegration(integration) oauth_integration.refresh_access_token() + elif integration.kind in GoogleCloudIntegration.supported_kinds: + gcloud_integration = GoogleCloudIntegration(integration) + gcloud_integration.refresh_access_token() return 0 diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index 1a59ddd3b0b54..0142e160d8191 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -102,7 +102,11 @@ impl RedisLimiter { match RedisLimiter::fetch_limited(&redis, &key).await { Ok(set) => { let set = HashSet::from_iter(set.iter().cloned()); - gauge!("capture_billing_limits_loaded_tokens",).set(set.len() as f64); + gauge!( + "capture_billing_limits_loaded_tokens", + "cache_key" => key.clone(), + ) + .set(set.len() as f64); let mut limited_lock = limited.write().await; *limited_lock = set; diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs index cfd86d7839193..f6d2d07d6de17 100644 --- a/rust/property-defs-rs/src/app_context.rs +++ b/rust/property-defs-rs/src/app_context.rs @@ -110,7 +110,6 @@ impl AppContext { name, update.team_id ) - .bind(name) .fetch_optional(&self.pool) .await?; diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index 846b307ee6876..21604910369a3 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -10,9 +10,9 @@ use property_defs_rs::{ message_to_event, metrics_consts::{ BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EVENTS_RECEIVED, FORCED_SMALL_BATCH, - PERMIT_WAIT_TIME, RECV_DEQUEUED, SKIPPED_DUE_TO_TEAM_FILTER, TRANSACTION_LIMIT_SATURATION, - UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, - WORKER_BLOCKED, + ISSUE_FAILED, PERMIT_WAIT_TIME, RECV_DEQUEUED, SKIPPED_DUE_TO_TEAM_FILTER, + TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, + UPDATE_ISSUE_TIME, WORKER_BLOCKED, }, types::Update, }; @@ -29,7 +29,7 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; common_alloc::used!(); @@ -226,7 +226,8 @@ async fn main() -> Result<(), Box> { let _permit = permit; let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]); if let Err(e) = context.issue(batch, cache_utilization).await { - warn!("Issue failed: {:?}", e); + metrics::counter!(ISSUE_FAILED).increment(1); + error!("Issue failed: {:?}", e); } issue_time.fin(); }); diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs index d8c3c83a6d17f..722be95afa612 100644 --- a/rust/property-defs-rs/src/metrics_consts.rs +++ b/rust/property-defs-rs/src/metrics_consts.rs @@ -21,3 +21,4 @@ pub const GROUP_TYPE_RESOLVE_TIME: &str = "prop_defs_group_type_resolve_time_ms" pub const UPDATES_SKIPPED: &str = "prop_defs_skipped_updates"; pub const GROUP_TYPE_READS: &str = "prop_defs_group_type_reads"; pub const SKIPPED_DUE_TO_TEAM_FILTER: &str = "prop_defs_skipped_due_to_team_filter"; +pub const ISSUE_FAILED: &str = "prop_defs_issue_failed"; diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs index 2acae28d79bfe..bd39129650357 100644 --- a/rust/property-defs-rs/src/types.rs +++ b/rust/property-defs-rs/src/types.rs @@ -439,10 +439,14 @@ impl PropertyDefinition { { let group_type_index = match &self.group_type_index { Some(GroupType::Resolved(_, i)) => Some(*i as i16), - _ => { + Some(GroupType::Unresolved(_)) => { warn!("Group type not resolved for property definition, skipping"); None } + _ => { + // We don't have a group type, so we don't have a group type index + None + } }; sqlx::query!(