From df278418a8e8422a820edd3eedf538f6e25d8eec Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Tue, 1 Aug 2023 21:35:42 +0100 Subject: [PATCH] feat(cli): Adds ability to upload recipes to DataHub's UI (#8317) Co-authored-by: Indy Prentice --- .../source/ListIngestionSourcesResolver.java | 6 +- .../src/main/resources/ingestion.graphql | 5 + .../ListIngestionSourceResolverTest.java | 6 +- docs/cli.md | 26 ++- docs/ui-ingestion.md | 43 +++++ metadata-ingestion/README.md | 13 ++ metadata-ingestion/setup.py | 1 + .../src/datahub/cli/ingest_cli.py | 153 ++++++++++++++++++ .../datahub/configuration/config_loader.py | 6 +- 9 files changed, 249 insertions(+), 10 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java index fa56f15bcf8d4..d019473606e58 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java @@ -4,6 +4,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.AuthorizationException; +import com.linkedin.datahub.graphql.generated.FacetFilterInput; import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput; import com.linkedin.datahub.graphql.generated.ListIngestionSourcesResult; import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils; @@ -20,6 +21,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -51,6 +53,7 @@ public CompletableFuture get(final DataFetchingEnvir final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart(); final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount(); final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery(); + final List filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters(); return CompletableFuture.supplyAsync(() -> { try { @@ -58,7 +61,8 @@ public CompletableFuture get(final DataFetchingEnvir final SearchResult gmsResult = _entityClient.search( Constants.INGESTION_SOURCE_ENTITY_NAME, query, - Collections.emptyMap(), + buildFilter(filters, Collections.emptyList()), + null, start, count, context.getAuthentication(), diff --git a/datahub-graphql-core/src/main/resources/ingestion.graphql b/datahub-graphql-core/src/main/resources/ingestion.graphql index 256b94ccdc244..69c8aff124583 100644 --- a/datahub-graphql-core/src/main/resources/ingestion.graphql +++ b/datahub-graphql-core/src/main/resources/ingestion.graphql @@ -428,6 +428,11 @@ input ListIngestionSourcesInput { An optional search query """ query: String + + """ + Optional Facet filters to apply to the result set + """ + filters: [FacetFilterInput!] } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java index 3fa19cca0623c..8e2453ce06a39 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java @@ -19,7 +19,6 @@ import com.linkedin.metadata.search.SearchResult; import com.linkedin.r2.RemoteInvocationException; import graphql.schema.DataFetchingEnvironment; -import java.util.Collections; import java.util.HashSet; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -30,7 +29,7 @@ public class ListIngestionSourceResolverTest { - private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null); + private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null, null); @Test public void testGetSuccess() throws Exception { @@ -44,7 +43,8 @@ public void testGetSuccess() throws Exception { Mockito.when(mockClient.search( Mockito.eq(Constants.INGESTION_SOURCE_ENTITY_NAME), Mockito.eq(""), - Mockito.eq(Collections.emptyMap()), + Mockito.any(), + Mockito.any(), Mockito.eq(0), Mockito.eq(20), Mockito.any(Authentication.class), diff --git a/docs/cli.md b/docs/cli.md index 64b7e2d76bba2..eb8bb406b0107 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -92,13 +92,29 @@ Source specific crawlers are provided by plugins and might sometimes need additi Usage: datahub [datahub-options] ingest [command-options] Command Options: - -c / --config Config file in .toml or .yaml format - -n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink - --preview Perform limited ingestion from the source to the sink to get a quick preview - --preview-workunits The number of workunits to produce for preview - --strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code + -c / --config Config file in .toml or .yaml format + -n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink + --preview Perform limited ingestion from the source to the sink to get a quick preview + --preview-workunits The number of workunits to produce for preview + --strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code + --test-source-connection When set, ingestion will only test the source connection details from the recipe ``` +#### ingest deploy + +The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md). +This command can also be used to schedule the ingestion while uploading or even to update existing sources. + +To schedule a recipe called "test", to run at 5am everyday, London time with the recipe configured in a local `recipe.yaml` file: +````shell +datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml +```` + +To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update. + +**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command. +I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated. + ### init The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default. diff --git a/docs/ui-ingestion.md b/docs/ui-ingestion.md index c3bedafc7da70..8cde561c38149 100644 --- a/docs/ui-ingestion.md +++ b/docs/ui-ingestion.md @@ -32,6 +32,9 @@ your first **Ingestion Source**. ### Creating an Ingestion Source + + + Before ingesting any metadata, you need to create a new Ingestion Source. Start by clicking **+ Create new source**.

@@ -166,6 +169,46 @@ _Pinning the CLI version to version `0.8.23.2`_ Once you're happy with your changes, simply click 'Done' to save. + + + +You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy). +An example execution would look something like: + +```bash +datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml +``` + +This would create a new recipe with the name `My Test Ingestion Source`. Note that to update an existing recipe, it's `urn` id must be passed as a parameter. +DataHub supports having multiple recipes with the same name so to distinguish them we use the urn for unique identification. + + + + +Create ingestion sources using [DataHub's GraphQL API](./api/graphql/overview.md) using the **createIngestionSource** mutation endpoint. +```graphql +mutation { + createIngestionSource(input: { + name: "My Test Ingestion Source", + type: "mysql", + description: "My ingestion source description", + schedule: {interval: "*/5 * * * *", timezone: "UTC"}, + config: { + recipe: "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}", + version: "0.8.18", + executorId: "mytestexecutor", + } + }) +} +``` + +To update sources, please use the `updateIngestionSource` endpoint. It is almost identical to the create endpoint, only requiring the urn of the source to be updated in addition to the same input as the create endpoint. + +**Note**: Recipe must be double quotes escaped + + + + ### Running an Ingestion Source Once you've created your Ingestion Source, you can run it by clicking 'Execute'. Shortly after, diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 47bc3b542163a..cacb5011d0766 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -161,6 +161,19 @@ reporting: report_recipe: false ``` +#### Deploying and scheduling ingestion to the UI + +The `deploy` subcommand of the `ingest` command tree allows users to upload their recipes and schedule them in the server. + +```shell +datahub ingest deploy -n -c recipe.yaml +``` + +By default, no schedule is done unless explicitly configured with the `--schedule` parameter. Timezones are inferred from the system time, can be overriden with `--time-zone` flag. +```shell +datahub ingest deploy -n test --schedule "0 * * * *" --time-zone "Europe/London" -c recipe.yaml +``` + ## Transformations If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 9940e8cdd0a97..4f156591eb756 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -415,6 +415,7 @@ def get_long_description(): "types-termcolor>=1.0.0", "types-Deprecated", "types-protobuf>=4.21.0.1", + "types-tzlocal", } diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 72c15e92257aa..42c0ea1601c74 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -4,11 +4,13 @@ import logging import os import sys +import textwrap from datetime import datetime from typing import Optional import click import click_spinner +import tzlocal from click_default_group import DefaultGroup from tabulate import tabulate @@ -21,6 +23,7 @@ post_rollback_endpoint, ) from datahub.configuration.config_loader import load_config_file +from datahub.ingestion.graph.client import get_default_graph from datahub.ingestion.run.connection import ConnectionManager from datahub.ingestion.run.pipeline import Pipeline from datahub.telemetry import telemetry @@ -198,6 +201,156 @@ async def run_ingestion_and_check_upgrade() -> int: # don't raise SystemExit if there's no error +@ingest.command() +@upgrade.check_upgrade +@telemetry.with_telemetry() +@click.option( + "-n", + "--name", + type=str, + help="Recipe Name", + required=True, +) +@click.option( + "-c", + "--config", + type=click.Path(dir_okay=False), + help="Config file in .toml or .yaml format.", + required=True, +) +@click.option( + "--urn", + type=str, + help="Urn of recipe to update", + required=False, +) +@click.option( + "--executor-id", + type=str, + default="default", + help="Executor id to route execution requests to. Do not use this unless you have configured a custom executor.", + required=False, +) +@click.option( + "--cli-version", + type=str, + help="Provide a custom CLI version to use for ingestion. By default will use server default.", + required=False, + default=None, +) +@click.option( + "--schedule", + type=str, + help="Cron definition for schedule. If none is provided, ingestion recipe will not be scheduled", + required=False, + default=None, +) +@click.option( + "--time-zone", + type=str, + help=f"Timezone for the schedule. By default uses the timezone of the current system: {tzlocal.get_localzone_name()}.", + required=False, + default=tzlocal.get_localzone_name(), +) +def deploy( + name: str, + config: str, + urn: str, + executor_id: str, + cli_version: str, + schedule: str, + time_zone: str, +) -> None: + """ + Deploy an ingestion recipe to your DataHub instance. + + The urn of the ingestion source will be based on the name parameter in the format: + urn:li:dataHubIngestionSource: + """ + + datahub_graph = get_default_graph() + + pipeline_config = load_config_file( + config, + allow_stdin=True, + resolve_env_vars=False, + ) + + graphql_query: str + + variables: dict = { + "urn": urn, + "name": name, + "type": pipeline_config["source"]["type"], + "schedule": {"interval": schedule, "timezone": time_zone}, + "recipe": json.dumps(pipeline_config), + "executorId": executor_id, + "version": cli_version, + } + + if urn: + if not datahub_graph.exists(urn): + logger.error(f"Could not find recipe for provided urn: {urn}") + exit() + logger.info("Found recipe URN, will update recipe.") + + graphql_query = textwrap.dedent( + """ + mutation updateIngestionSource( + $urn: String!, + $name: String!, + $type: String!, + $schedule: UpdateIngestionSourceScheduleInput, + $recipe: String!, + $executorId: String! + $version: String) { + + updateIngestionSource(urn: $urn, input: { + name: $name, + type: $type, + schedule: $schedule, + config: { + recipe: $recipe, + executorId: $executorId, + version: $version, + } + }) + } + """ + ) + else: + logger.info("No URN specified recipe urn, will create a new recipe.") + graphql_query = textwrap.dedent( + """ + mutation createIngestionSource( + $name: String!, + $type: String!, + $schedule: UpdateIngestionSourceScheduleInput, + $recipe: String!, + $executorId: String!, + $version: String) { + + createIngestionSource(input: { + type: $type, + schedule: $schedule, + config: { + recipe: $recipe, + executorId: $executorId, + version: $version, + } + }) + } + """ + ) + + response = datahub_graph.execute_graphql(graphql_query, variables=variables) + + click.echo( + f"✅ Successfully wrote data ingestion source metadata for recipe {name}:" + ) + click.echo(response) + + def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None: connection_report = None try: diff --git a/metadata-ingestion/src/datahub/configuration/config_loader.py b/metadata-ingestion/src/datahub/configuration/config_loader.py index 8c2c635bb1df5..78bee21d1bda4 100644 --- a/metadata-ingestion/src/datahub/configuration/config_loader.py +++ b/metadata-ingestion/src/datahub/configuration/config_loader.py @@ -72,6 +72,7 @@ def load_config_file( squirrel_original_config: bool = False, squirrel_field: str = "__orig_config", allow_stdin: bool = False, + resolve_env_vars: bool = True, ) -> dict: config_mech: ConfigurationMechanism if allow_stdin and config_file == "-": @@ -104,7 +105,10 @@ def load_config_file( config_fp = io.StringIO(raw_config_file) raw_config = config_mech.load_config(config_fp) - config = resolve_env_variables(raw_config) + if resolve_env_vars: + config = resolve_env_variables(raw_config) + else: + config = raw_config if squirrel_original_config: config[squirrel_field] = raw_config return config