Skip to content

Commit

Permalink
feat(cli): Adds ability to upload recipes to DataHub's UI (#8317)
Browse files Browse the repository at this point in the history
Co-authored-by: Indy Prentice <[email protected]>
  • Loading branch information
2 people authored and yoonhyejin committed Aug 24, 2023
1 parent 5f719b0 commit df27841
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesResult;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
Expand All @@ -20,6 +21,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -51,14 +53,16 @@ public CompletableFuture<ListIngestionSourcesResult> get(final DataFetchingEnvir
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
final List<FacetFilterInput> filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters();

return CompletableFuture.supplyAsync(() -> {
try {
// First, get all ingestion sources Urns.
final SearchResult gmsResult = _entityClient.search(
Constants.INGESTION_SOURCE_ENTITY_NAME,
query,
Collections.emptyMap(),
buildFilter(filters, Collections.emptyList()),
null,
start,
count,
context.getAuthentication(),
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ input ListIngestionSourcesInput {
An optional search query
"""
query: String

"""
Optional Facet filters to apply to the result set
"""
filters: [FacetFilterInput!]
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.HashSet;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand All @@ -30,7 +29,7 @@

public class ListIngestionSourceResolverTest {

private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null);
private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null, null);

@Test
public void testGetSuccess() throws Exception {
Expand All @@ -44,7 +43,8 @@ public void testGetSuccess() throws Exception {
Mockito.when(mockClient.search(
Mockito.eq(Constants.INGESTION_SOURCE_ENTITY_NAME),
Mockito.eq(""),
Mockito.eq(Collections.emptyMap()),
Mockito.any(),
Mockito.any(),
Mockito.eq(0),
Mockito.eq(20),
Mockito.any(Authentication.class),
Expand Down
26 changes: 21 additions & 5 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,29 @@ Source specific crawlers are provided by plugins and might sometimes need additi
Usage: datahub [datahub-options] ingest [command-options]

Command Options:
-c / --config Config file in .toml or .yaml format
-n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink
--preview Perform limited ingestion from the source to the sink to get a quick preview
--preview-workunits The number of workunits to produce for preview
--strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code
-c / --config Config file in .toml or .yaml format
-n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink
--preview Perform limited ingestion from the source to the sink to get a quick preview
--preview-workunits The number of workunits to produce for preview
--strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code
--test-source-connection When set, ingestion will only test the source connection details from the recipe
```

#### ingest deploy

The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
This command can also be used to schedule the ingestion while uploading or even to update existing sources.

To schedule a recipe called "test", to run at 5am everyday, London time with the recipe configured in a local `recipe.yaml` file:
````shell
datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml
````

To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update.

**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command.
I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated.

### init

The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default.
Expand Down
43 changes: 43 additions & 0 deletions docs/ui-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ your first **Ingestion Source**.

### Creating an Ingestion Source

<Tabs>
<TabItem value="ui" label="UI" default>

Before ingesting any metadata, you need to create a new Ingestion Source. Start by clicking **+ Create new source**.

<p align="center">
Expand Down Expand Up @@ -166,6 +169,46 @@ _Pinning the CLI version to version `0.8.23.2`_

Once you're happy with your changes, simply click 'Done' to save.

</TabItem>
<TabItem value="cli" label="CLI" default>

You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy).
An example execution would look something like:

```bash
datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml
```

This would create a new recipe with the name `My Test Ingestion Source`. Note that to update an existing recipe, it's `urn` id must be passed as a parameter.
DataHub supports having multiple recipes with the same name so to distinguish them we use the urn for unique identification.

</TabItem>
<TabItem value="graphql" label="GraphQL" default>

Create ingestion sources using [DataHub's GraphQL API](./api/graphql/overview.md) using the **createIngestionSource** mutation endpoint.
```graphql
mutation {
createIngestionSource(input: {
name: "My Test Ingestion Source",
type: "mysql",
description: "My ingestion source description",
schedule: {interval: "*/5 * * * *", timezone: "UTC"},
config: {
recipe: "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}",
version: "0.8.18",
executorId: "mytestexecutor",
}
})
}
```

To update sources, please use the `updateIngestionSource` endpoint. It is almost identical to the create endpoint, only requiring the urn of the source to be updated in addition to the same input as the create endpoint.

**Note**: Recipe must be double quotes escaped

</TabItem>
</Tabs>

### Running an Ingestion Source

Once you've created your Ingestion Source, you can run it by clicking 'Execute'. Shortly after,
Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ reporting:
report_recipe: false
```
#### Deploying and scheduling ingestion to the UI
The `deploy` subcommand of the `ingest` command tree allows users to upload their recipes and schedule them in the server.

```shell
datahub ingest deploy -n <user friendly name for ingestion> -c recipe.yaml
```

By default, no schedule is done unless explicitly configured with the `--schedule` parameter. Timezones are inferred from the system time, can be overriden with `--time-zone` flag.
```shell
datahub ingest deploy -n test --schedule "0 * * * *" --time-zone "Europe/London" -c recipe.yaml
```

## Transformations

If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def get_long_description():
"types-termcolor>=1.0.0",
"types-Deprecated",
"types-protobuf>=4.21.0.1",
"types-tzlocal",
}


Expand Down
153 changes: 153 additions & 0 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import logging
import os
import sys
import textwrap
from datetime import datetime
from typing import Optional

import click
import click_spinner
import tzlocal
from click_default_group import DefaultGroup
from tabulate import tabulate

Expand All @@ -21,6 +23,7 @@
post_rollback_endpoint,
)
from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.run.connection import ConnectionManager
from datahub.ingestion.run.pipeline import Pipeline
from datahub.telemetry import telemetry
Expand Down Expand Up @@ -198,6 +201,156 @@ async def run_ingestion_and_check_upgrade() -> int:
# don't raise SystemExit if there's no error


@ingest.command()
@upgrade.check_upgrade
@telemetry.with_telemetry()
@click.option(
"-n",
"--name",
type=str,
help="Recipe Name",
required=True,
)
@click.option(
"-c",
"--config",
type=click.Path(dir_okay=False),
help="Config file in .toml or .yaml format.",
required=True,
)
@click.option(
"--urn",
type=str,
help="Urn of recipe to update",
required=False,
)
@click.option(
"--executor-id",
type=str,
default="default",
help="Executor id to route execution requests to. Do not use this unless you have configured a custom executor.",
required=False,
)
@click.option(
"--cli-version",
type=str,
help="Provide a custom CLI version to use for ingestion. By default will use server default.",
required=False,
default=None,
)
@click.option(
"--schedule",
type=str,
help="Cron definition for schedule. If none is provided, ingestion recipe will not be scheduled",
required=False,
default=None,
)
@click.option(
"--time-zone",
type=str,
help=f"Timezone for the schedule. By default uses the timezone of the current system: {tzlocal.get_localzone_name()}.",
required=False,
default=tzlocal.get_localzone_name(),
)
def deploy(
name: str,
config: str,
urn: str,
executor_id: str,
cli_version: str,
schedule: str,
time_zone: str,
) -> None:
"""
Deploy an ingestion recipe to your DataHub instance.
The urn of the ingestion source will be based on the name parameter in the format:
urn:li:dataHubIngestionSource:<name>
"""

datahub_graph = get_default_graph()

pipeline_config = load_config_file(
config,
allow_stdin=True,
resolve_env_vars=False,
)

graphql_query: str

variables: dict = {
"urn": urn,
"name": name,
"type": pipeline_config["source"]["type"],
"schedule": {"interval": schedule, "timezone": time_zone},
"recipe": json.dumps(pipeline_config),
"executorId": executor_id,
"version": cli_version,
}

if urn:
if not datahub_graph.exists(urn):
logger.error(f"Could not find recipe for provided urn: {urn}")
exit()
logger.info("Found recipe URN, will update recipe.")

graphql_query = textwrap.dedent(
"""
mutation updateIngestionSource(
$urn: String!,
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!
$version: String) {
updateIngestionSource(urn: $urn, input: {
name: $name,
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)
else:
logger.info("No URN specified recipe urn, will create a new recipe.")
graphql_query = textwrap.dedent(
"""
mutation createIngestionSource(
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!,
$version: String) {
createIngestionSource(input: {
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)

response = datahub_graph.execute_graphql(graphql_query, variables=variables)

click.echo(
f"✅ Successfully wrote data ingestion source metadata for recipe {name}:"
)
click.echo(response)


def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None:
connection_report = None
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def load_config_file(
squirrel_original_config: bool = False,
squirrel_field: str = "__orig_config",
allow_stdin: bool = False,
resolve_env_vars: bool = True,
) -> dict:
config_mech: ConfigurationMechanism
if allow_stdin and config_file == "-":
Expand Down Expand Up @@ -104,7 +105,10 @@ def load_config_file(

config_fp = io.StringIO(raw_config_file)
raw_config = config_mech.load_config(config_fp)
config = resolve_env_variables(raw_config)
if resolve_env_vars:
config = resolve_env_variables(raw_config)
else:
config = raw_config
if squirrel_original_config:
config[squirrel_field] = raw_config
return config

0 comments on commit df27841

Please sign in to comment.