Skip to content

Commit

Permalink
feat(data-warehouse): add the ability to connect to MySQL as a datawa… (
Browse files Browse the repository at this point in the history
#23921)

Co-authored-by: Jacob Spizziri <[email protected]>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and thmsobrmlr committed Jul 24, 2024
1 parent c29b204 commit ffb7446
Show file tree
Hide file tree
Showing 20 changed files with 337 additions and 47 deletions.
Binary file added frontend/public/services/mysql.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
124 changes: 124 additions & 0 deletions frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,130 @@ export const SOURCE_DETAILS: Record<ExternalDataSourceType, SourceConfig> = {
},
],
},
MySQL: {
name: 'MySQL',
caption: (
<>
Enter your MySQL/MariaDB credentials to automatically pull your MySQL data into the PostHog Data
warehouse.
</>
),
fields: [
{
name: 'host',
label: 'Host',
type: 'text',
required: true,
placeholder: 'localhost',
},
{
name: 'port',
label: 'Port',
type: 'number',
required: true,
placeholder: '3306',
},
{
name: 'dbname',
label: 'Database',
type: 'text',
required: true,
placeholder: 'mysql',
},
{
name: 'user',
label: 'User',
type: 'text',
required: true,
placeholder: 'mysql',
},
{
name: 'password',
label: 'Password',
type: 'password',
required: true,
placeholder: '',
},
{
name: 'schema',
label: 'Schema',
type: 'text',
required: true,
placeholder: 'public',
},
{
name: 'ssh-tunnel',
label: 'Use SSH tunnel?',
type: 'switch-group',
default: false,
fields: [
{
name: 'host',
label: 'Tunnel host',
type: 'text',
required: true,
placeholder: 'localhost',
},
{
name: 'port',
label: 'Tunnel port',
type: 'number',
required: true,
placeholder: '22',
},
{
type: 'select',
name: 'auth_type',
label: 'Authentication type',
required: true,
defaultValue: 'password',
options: [
{
label: 'Password',
value: 'password',
fields: [
{
name: 'username',
label: 'Tunnel username',
type: 'text',
required: true,
placeholder: 'User1',
},
{
name: 'password',
label: 'Tunnel password',
type: 'password',
required: true,
placeholder: '',
},
],
},
{
label: 'Key pair',
value: 'keypair',
fields: [
{
name: 'private_key',
label: 'Tunnel private key',
type: 'textarea',
required: true,
placeholder: '',
},
{
name: 'passphrase',
label: 'Tunnel passphrase',
type: 'password',
required: false,
placeholder: '',
},
],
},
],
},
],
},
],
},
Snowflake: {
name: 'Snowflake',
caption: (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Iconazure from 'public/services/azure.png'
import IconCloudflare from 'public/services/cloudflare.png'
import IconGoogleCloudStorage from 'public/services/google-cloud-storage.png'
import IconHubspot from 'public/services/hubspot.png'
import IconMySQL from 'public/services/mysql.png'
import IconPostgres from 'public/services/postgres.png'
import IconSnowflake from 'public/services/snowflake.png'
import IconStripe from 'public/services/stripe.png'
Expand Down Expand Up @@ -187,6 +188,7 @@ export function RenderDataWarehouseSourceIcon({
Hubspot: IconHubspot,
Zendesk: IconZendesk,
Postgres: IconPostgres,
MySQL: IconMySQL,
Snowflake: IconSnowflake,
aws: IconAwsS3,
'google-cloud': IconGoogleCloudStorage,
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3828,7 +3828,7 @@ export enum DataWarehouseSettingsTab {
SelfManaged = 'self-managed',
}

export const externalDataSources = ['Stripe', 'Hubspot', 'Postgres', 'Zendesk', 'Snowflake'] as const
export const externalDataSources = ['Stripe', 'Hubspot', 'Postgres', 'MySQL', 'Zendesk', 'Snowflake'] as const

export type ExternalDataSourceType = (typeof externalDataSources)[number]

Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0447_alter_integration_kind
posthog: 0448_add_mysql_externaldatasource_source_type
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.2.11 on 2024-06-05 17:12

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0447_alter_integration_kind"),
]

operations = [
migrations.AlterField(
model_name="externaldatasource",
name="source_type",
field=models.CharField(
choices=[
("Stripe", "Stripe"),
("Hubspot", "Hubspot"),
("Postgres", "Postgres"),
("Zendesk", "Zendesk"),
("Snowflake", "Snowflake"),
("MySQL", "MySQL"),
],
max_length=128,
),
),
]
3 changes: 3 additions & 0 deletions posthog/temporal/data_imports/pipelines/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
),
ExternalDataSource.Type.POSTGRES: (),
ExternalDataSource.Type.SNOWFLAKE: (),
ExternalDataSource.Type.MYSQL: (),
}

PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = {
Expand All @@ -29,6 +30,7 @@
ExternalDataSource.Type.ZENDESK: ZENDESK_INCREMENTAL_ENDPOINTS,
ExternalDataSource.Type.POSTGRES: (),
ExternalDataSource.Type.SNOWFLAKE: (),
ExternalDataSource.Type.MYSQL: (),
}

PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = {
Expand All @@ -37,4 +39,5 @@
ExternalDataSource.Type.ZENDESK: ZENDESK_INCREMENTAL_FIELDS,
ExternalDataSource.Type.POSTGRES: {},
ExternalDataSource.Type.SNOWFLAKE: {},
ExternalDataSource.Type.MYSQL: {},
}
17 changes: 12 additions & 5 deletions posthog/temporal/data_imports/pipelines/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from urllib.parse import quote

from posthog.warehouse.types import IncrementalFieldType
from posthog.warehouse.models.external_data_source import ExternalDataSource
from sqlalchemy.sql import text

from .helpers import (
Expand All @@ -35,7 +36,8 @@ def incremental_type_to_initial_value(field_type: IncrementalFieldType) -> Any:
return date(1970, 1, 1)


def postgres_source(
def sql_source_for_type(
source_type: ExternalDataSource.Type,
host: str,
port: int,
user: str,
Expand All @@ -53,17 +55,22 @@ def postgres_source(
database = quote(database)
sslmode = quote(sslmode)

credentials = ConnectionStringCredentials(
f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}"
)

if incremental_field is not None and incremental_field_type is not None:
incremental: dlt.sources.incremental | None = dlt.sources.incremental(
cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type)
)
else:
incremental = None

if source_type == ExternalDataSource.Type.POSTGRES:
credentials = ConnectionStringCredentials(
f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}"
)
elif source_type == ExternalDataSource.Type.MYSQL:
credentials = ConnectionStringCredentials(f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}")
else:
raise Exception("Unsupported source_type")

db_source = sql_database(credentials, schema=schema, table_names=table_names, incremental=incremental)

return db_source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from posthog.warehouse.models import sync_old_schemas_with_new_schemas, ExternalDataSource, aget_schema_by_id
from posthog.warehouse.models.external_data_schema import (
ExternalDataSchema,
get_postgres_schemas,
get_sql_schemas_for_source_type,
get_snowflake_schemas,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
Expand Down Expand Up @@ -46,7 +46,7 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM

source = await sync_to_async(ExternalDataSource.objects.get)(team_id=inputs.team_id, id=inputs.source_id)

if source.source_type == ExternalDataSource.Type.POSTGRES:
if source.source_type in [ExternalDataSource.Type.POSTGRES, ExternalDataSource.Type.MYSQL]:
host = source.job_inputs.get("host")
port = source.job_inputs.get("port")
user = source.job_inputs.get("user")
Expand Down Expand Up @@ -74,8 +74,8 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM
private_key=ssh_tunnel_auth_type_private_key,
)

schemas_to_sync = await sync_to_async(get_postgres_schemas)(
host, port, database, user, password, db_schema, ssh_tunnel
schemas_to_sync = await sync_to_async(get_sql_schemas_for_source_type)(
source.source_type, host, port, database, user, password, db_schema, ssh_tunnel
)
elif source.source_type == ExternalDataSource.Type.SNOWFLAKE:
account_id = source.job_inputs.get("account_id")
Expand Down
10 changes: 6 additions & 4 deletions posthog/temporal/data_imports/workflow_activities/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
schema=schema,
reset_pipeline=reset_pipeline,
)
elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES:
from posthog.temporal.data_imports.pipelines.sql_database import postgres_source
elif model.pipeline.source_type in [ExternalDataSource.Type.POSTGRES, ExternalDataSource.Type.MYSQL]:
from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type

host = model.pipeline.job_inputs.get("host")
port = model.pipeline.job_inputs.get("port")
Expand Down Expand Up @@ -137,7 +137,8 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
if tunnel is None:
raise Exception("Can't open tunnel to SSH server")

source = postgres_source(
source = sql_source_for_type(
source_type=model.pipeline.source_type,
host=tunnel.local_bind_host,
port=tunnel.local_bind_port,
user=user,
Expand All @@ -163,7 +164,8 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
reset_pipeline=reset_pipeline,
)

source = postgres_source(
source = sql_source_for_type(
source_type=model.pipeline.source_type,
host=host,
port=port,
user=user,
Expand Down
15 changes: 9 additions & 6 deletions posthog/temporal/tests/batch_exports/test_import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ async def test_postgres_source_without_ssh_tunnel(activity_environment, team, **
activity_inputs = await _setup(team, job_inputs)

with (
mock.patch("posthog.temporal.data_imports.pipelines.sql_database.postgres_source") as postgres_source,
mock.patch("posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type") as sql_source_for_type,
mock.patch("posthog.temporal.data_imports.workflow_activities.import_data._run"),
):
await activity_environment.run(import_data_activity, activity_inputs)

postgres_source.assert_called_once_with(
sql_source_for_type.assert_called_once_with(
source_type=ExternalDataSource.Type.POSTGRES,
host="host.com",
port="5432",
user="Username",
Expand Down Expand Up @@ -107,12 +108,13 @@ async def test_postgres_source_with_ssh_tunnel_disabled(activity_environment, te
activity_inputs = await _setup(team, job_inputs)

with (
mock.patch("posthog.temporal.data_imports.pipelines.sql_database.postgres_source") as postgres_source,
mock.patch("posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type") as sql_source_for_type,
mock.patch("posthog.temporal.data_imports.workflow_activities.import_data._run"),
):
await activity_environment.run(import_data_activity, activity_inputs)

postgres_source.assert_called_once_with(
sql_source_for_type.assert_called_once_with(
source_type=ExternalDataSource.Type.POSTGRES,
host="host.com",
port="5432",
user="Username",
Expand Down Expand Up @@ -160,13 +162,14 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
return MockedTunnel()

with (
mock.patch("posthog.temporal.data_imports.pipelines.sql_database.postgres_source") as postgres_source,
mock.patch("posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type") as sql_source_for_type,
mock.patch("posthog.temporal.data_imports.workflow_activities.import_data._run"),
mock.patch.object(SSHTunnel, "get_tunnel", mock_get_tunnel),
):
await activity_environment.run(import_data_activity, activity_inputs)

postgres_source.assert_called_once_with(
sql_source_for_type.assert_called_once_with(
source_type=ExternalDataSource.Type.POSTGRES,
host="other-host.com",
port=55550,
user="Username",
Expand Down
Loading

0 comments on commit ffb7446

Please sign in to comment.