diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java index 8ead47aa65ceb..33b1555b73fab 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java @@ -15,20 +15,22 @@ import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchResult; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; -import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; /** Lists all ingestion sources stored within DataHub. Requires the MANAGE_INGESTION privilege. */ +@Slf4j public class ListIngestionSourcesResolver implements DataFetcher> { @@ -57,6 +59,22 @@ public CompletableFuture get( final List filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters(); + // construct sort criteria, defaulting to systemCreated + final SortCriterion sortCriterion; + + // if query is expecting to sort by something, use that + final com.linkedin.datahub.graphql.generated.SortCriterion sortCriterionInput = + input.getSort(); + if (sortCriterionInput != null) { + sortCriterion = + new SortCriterion() + .setField(sortCriterionInput.getField()) + .setOrder(SortOrder.valueOf(sortCriterionInput.getSortOrder().name())); + } else { + // TODO: default to last executed + sortCriterion = null; + } + return GraphQLConcurrencyUtils.supplyAsync( () -> { try { @@ -69,33 +87,24 @@ public CompletableFuture get( Constants.INGESTION_SOURCE_ENTITY_NAME, query, buildFilter(filters, Collections.emptyList()), - null, + sortCriterion != null ? List.of(sortCriterion) : null, start, count); + final List entitiesUrnList = + gmsResult.getEntities().stream().map(SearchEntity::getEntity).toList(); // Then, resolve all ingestion sources final Map entities = _entityClient.batchGetV2( context.getOperationContext(), Constants.INGESTION_SOURCE_ENTITY_NAME, - new HashSet<>( - gmsResult.getEntities().stream() - .map(SearchEntity::getEntity) - .collect(Collectors.toList())), + new HashSet<>(entitiesUrnList), ImmutableSet.of( Constants.INGESTION_INFO_ASPECT_NAME, Constants.INGESTION_SOURCE_KEY_ASPECT_NAME)); - final Collection sortedEntities = - entities.values().stream() - .sorted( - Comparator.comparingLong( - s -> - -s.getAspects() - .get(Constants.INGESTION_SOURCE_KEY_ASPECT_NAME) - .getCreated() - .getTime())) - .collect(Collectors.toList()); + final List entitiesOrdered = + entitiesUrnList.stream().map(entities::get).filter(Objects::nonNull).toList(); // Now that we have entities we can bind this to a result. final ListIngestionSourcesResult result = new ListIngestionSourcesResult(); @@ -103,7 +112,7 @@ public CompletableFuture get( result.setCount(gmsResult.getPageSize()); result.setTotal(gmsResult.getNumEntities()); result.setIngestionSources( - IngestionResolverUtils.mapIngestionSources(sortedEntities)); + IngestionResolverUtils.mapIngestionSources(entitiesOrdered)); return result; } catch (Exception e) { diff --git a/datahub-graphql-core/src/main/resources/ingestion.graphql b/datahub-graphql-core/src/main/resources/ingestion.graphql index 77327ae6d4db1..719ffea30c3dd 100644 --- a/datahub-graphql-core/src/main/resources/ingestion.graphql +++ b/datahub-graphql-core/src/main/resources/ingestion.graphql @@ -448,6 +448,11 @@ input ListIngestionSourcesInput { Optional Facet filters to apply to the result set """ filters: [FacetFilterInput!] + + """ + Optional sort order. Defaults to use systemCreated. + """ + sort: SortCriterion } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java index 05428788dc3c9..dc22255b1537c 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java @@ -28,7 +28,7 @@ public class ListIngestionSourceResolverTest { private static final ListIngestionSourcesInput TEST_INPUT = - new ListIngestionSourcesInput(0, 20, null, null); + new ListIngestionSourcesInput(0, 20, null, null, null); @Test public void testGetSuccess() throws Exception { diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java new file mode 100644 index 0000000000000..f525c4e35875d --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java @@ -0,0 +1,29 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) +public class BackfillIngestionSourceInfoIndicesConfig { + + @Bean + public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices( + final OperationContext opContext, + final EntityService entityService, + final AspectDao aspectDao, + @Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled, + @Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize, + @Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs, + @Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) { + return new BackfillIngestionSourceInfoIndices( + opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/ingestion/BackfillIngestionSourceInfoIndices.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/ingestion/BackfillIngestionSourceInfoIndices.java new file mode 100644 index 0000000000000..70f0844367f67 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/ingestion/BackfillIngestionSourceInfoIndices.java @@ -0,0 +1,43 @@ +package com.linkedin.datahub.upgrade.system.ingestion; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import javax.annotation.Nonnull; + +public class BackfillIngestionSourceInfoIndices implements NonBlockingSystemUpgrade { + + private final List _steps; + + public BackfillIngestionSourceInfoIndices( + @Nonnull OperationContext opContext, + EntityService entityService, + AspectDao aspectDao, + boolean enabled, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { + if (enabled) { + _steps = + ImmutableList.of( + new BackfillIngestionSourceInfoIndicesStep( + opContext, entityService, aspectDao, batchSize, batchDelayMs, limit)); + } else { + _steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return getClass().getSimpleName(); + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/ingestion/BackfillIngestionSourceInfoIndicesStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/ingestion/BackfillIngestionSourceInfoIndicesStep.java new file mode 100644 index 0000000000000..2525a57bfd7ec --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/ingestion/BackfillIngestionSourceInfoIndicesStep.java @@ -0,0 +1,56 @@ +package com.linkedin.datahub.upgrade.system.ingestion; + +import static com.linkedin.metadata.Constants.*; + +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.upgrade.system.AbstractMCLStep; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BackfillIngestionSourceInfoIndicesStep extends AbstractMCLStep { + + private static final String UPGRADE_ID = BackfillIngestionSourceInfoIndices.class.getSimpleName(); + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); + + public BackfillIngestionSourceInfoIndicesStep( + @Nonnull OperationContext opContext, + EntityService entityService, + AspectDao aspectDao, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { + super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit); + } + + @Override + public String id() { + return UPGRADE_ID; + } + + @Nonnull + @Override + protected String getAspectName() { + return INGESTION_INFO_ASPECT_NAME; + } + + @Nullable + @Override + protected String getUrnLike() { + return "urn:li:" + INGESTION_SOURCE_ENTITY_NAME + ":%"; + } + + /** + * Returns whether the upgrade should proceed if the step fails after exceeding the maximum + * retries. + */ + @Override + public boolean isOptional() { + return true; + } +} diff --git a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx index d9ab4cdc499f5..ccfa200fab630 100644 --- a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx +++ b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx @@ -17,7 +17,7 @@ import TabToolbar from '../../entity/shared/components/styled/TabToolbar'; import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal'; import { addToListIngestionSourcesCache, CLI_EXECUTOR_ID, removeFromListIngestionSourcesCache } from './utils'; import { DEFAULT_EXECUTOR_ID, SourceBuilderState, StringMapEntryInput } from './builder/types'; -import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated'; +import { IngestionSource, SortCriterion, SortOrder, UpdateIngestionSourceInput } from '../../../types.generated'; import { SearchBar } from '../../search/SearchBar'; import { useEntityRegistry } from '../../useEntityRegistry'; import { ExecutionDetailsModal } from './executions/ExecutionRequestDetailsModal'; @@ -60,16 +60,6 @@ export enum IngestionSourceType { CLI, } -export function shouldIncludeSource(source: any, sourceFilter: IngestionSourceType) { - if (sourceFilter === IngestionSourceType.CLI) { - return source.config.executorId === CLI_EXECUTOR_ID; - } - if (sourceFilter === IngestionSourceType.UI) { - return source.config.executorId !== CLI_EXECUTOR_ID; - } - return true; -} - const DEFAULT_PAGE_SIZE = 25; const removeExecutionsFromIngestionSource = (source) => { @@ -105,6 +95,7 @@ export const IngestionSourceList = () => { // Set of removed urns used to account for eventual consistency const [removedUrns, setRemovedUrns] = useState([]); const [sourceFilter, setSourceFilter] = useState(IngestionSourceType.ALL); + const [sort, setSort] = useState(); const [hideSystemSources, setHideSystemSources] = useState(true); /** @@ -115,7 +106,14 @@ export const IngestionSourceList = () => { // Ingestion Source Default Filters const filters = hideSystemSources ? [{ field: 'sourceType', values: [SYSTEM_INTERNAL_SOURCE_TYPE], negated: true }] - : undefined; + : []; + if (sourceFilter !== IngestionSourceType.ALL) { + filters.push({ + field: 'sourceExecutorId', + values: [CLI_EXECUTOR_ID], + negated: sourceFilter !== IngestionSourceType.CLI, + }); + } // Ingestion Source Queries const { loading, error, data, client, refetch } = useListIngestionSourcesQuery({ @@ -123,8 +121,9 @@ export const IngestionSourceList = () => { input: { start, count: pageSize, - query: (query?.length && query) || undefined, - filters, + query: query?.length ? query : undefined, + filters: filters.length ? filters : undefined, + sort, }, }, fetchPolicy: (query?.length || 0) > 0 ? 'no-cache' : 'cache-first', @@ -138,9 +137,7 @@ export const IngestionSourceList = () => { const totalSources = data?.listIngestionSources?.total || 0; const sources = data?.listIngestionSources?.ingestionSources || []; - const filteredSources = sources.filter( - (source) => !removedUrns.includes(source.urn) && shouldIncludeSource(source, sourceFilter), - ) as IngestionSource[]; + const filteredSources = sources.filter((source) => !removedUrns.includes(source.urn)) as IngestionSource[]; const focusSource = (focusSourceUrn && filteredSources.find((source) => source.urn === focusSourceUrn)) || undefined; @@ -376,6 +373,17 @@ export const IngestionSourceList = () => { setFocusSourceUrn(undefined); }; + const onChangeSort = (field, order) => { + setSort( + order + ? { + sortOrder: order === 'ascend' ? SortOrder.Ascending : SortOrder.Descending, + field, + } + : undefined, + ); + }; + return ( <> {!data && loading && } @@ -438,6 +446,7 @@ export const IngestionSourceList = () => { onView={onView} onDelete={onDelete} onRefresh={onRefresh} + onChangeSort={onChangeSort} /> void; onDelete: (urn: string) => void; onRefresh: () => void; + onChangeSort: (field: string, order: SorterResult['order']) => void; } function IngestionSourceTable({ @@ -42,6 +44,7 @@ function IngestionSourceTable({ onView, onDelete, onRefresh, + onChangeSort, }: Props) { const tableColumns = [ { @@ -49,14 +52,14 @@ function IngestionSourceTable({ dataIndex: 'type', key: 'type', render: (type: string, record: any) => , - sorter: (sourceA, sourceB) => sourceA.type.localeCompare(sourceB.type), + sorter: true, }, { title: 'Name', dataIndex: 'name', key: 'name', render: (name: string) => name || '', - sorter: (sourceA, sourceB) => sourceA.name.localeCompare(sourceB.name), + sorter: true, }, { title: 'Schedule', @@ -69,14 +72,12 @@ function IngestionSourceTable({ dataIndex: 'execCount', key: 'execCount', render: (execCount: any) => {execCount || '0'}, - sorter: (sourceA, sourceB) => sourceA.execCount - sourceB.execCount, }, { title: 'Last Execution', dataIndex: 'lastExecTime', key: 'lastExecTime', render: LastExecutionColumn, - sorter: (sourceA, sourceB) => sourceA.lastExecTime - sourceB.lastExecTime, }, { title: 'Last Status', @@ -85,7 +86,6 @@ function IngestionSourceTable({ render: (status: any, record) => ( ), - sorter: (sourceA, sourceB) => (sourceA.lastExecStatus || '').localeCompare(sourceB.lastExecStatus || ''), }, { title: '', @@ -127,9 +127,17 @@ function IngestionSourceTable({ cliIngestion: source.config?.executorId === CLI_EXECUTOR_ID, })); + const handleTableChange = (_: any, __: any, sorter: any) => { + const sorterTyped: SorterResult = sorter; + const field = sorterTyped.field as string; + const { order } = sorterTyped; + onChangeSort(field, order); + }; + return ( (record.cliIngestion ? 'cliIngestion' : '')} diff --git a/datahub-web-react/src/app/ingest/source/__tests__/IngestionSourceList.test.tsx b/datahub-web-react/src/app/ingest/source/__tests__/IngestionSourceList.test.tsx deleted file mode 100644 index b057f8b24c980..0000000000000 --- a/datahub-web-react/src/app/ingest/source/__tests__/IngestionSourceList.test.tsx +++ /dev/null @@ -1,38 +0,0 @@ -import { IngestionSourceType, shouldIncludeSource } from '../IngestionSourceList'; -import { CLI_EXECUTOR_ID } from '../utils'; - -describe('shouldIncludeSource', () => { - it('should return true if the source filter is for CLI and the source is a CLI source', () => { - const source = { config: { executorId: CLI_EXECUTOR_ID } }; - const isSourceIncluded = shouldIncludeSource(source, IngestionSourceType.CLI); - expect(isSourceIncluded).toBe(true); - }); - - it('should return false if the source filter is for CLI and the source is not a CLI source', () => { - const source = { config: { executorId: 'default' } }; - const isSourceIncluded = shouldIncludeSource(source, IngestionSourceType.CLI); - expect(isSourceIncluded).toBe(false); - }); - - it('should return true if the source filter is for UI and the source is a UI source', () => { - const source = { config: { executorId: 'default' } }; - const isSourceIncluded = shouldIncludeSource(source, IngestionSourceType.UI); - expect(isSourceIncluded).toBe(true); - }); - - it('should return false if the source filter is for UI and the source is not a UI source', () => { - const source = { config: { executorId: CLI_EXECUTOR_ID } }; - const isSourceIncluded = shouldIncludeSource(source, IngestionSourceType.UI); - expect(isSourceIncluded).toBe(false); - }); - - it('should return true no matter what if the source type is all', () => { - const source1 = { config: { executorId: CLI_EXECUTOR_ID } }; - const isSourceIncluded1 = shouldIncludeSource(source1, IngestionSourceType.ALL); - expect(isSourceIncluded1).toBe(true); - - const source2 = { config: { executorId: 'default' } }; - const isSourceIncluded2 = shouldIncludeSource(source2, IngestionSourceType.ALL); - expect(isSourceIncluded2).toBe(true); - }); -}); diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 2c9235e5196f7..0470723c1adb7 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -108,6 +108,11 @@ module.exports = { type: "doc", id: "docs/automations/docs-propagation", }, + { + label: "Glossary Term Propagation", + type: "doc", + id: "docs/automations/glossary-term-propagation", + }, { label: "BigQuery Metadata Sync", type: "doc", diff --git a/docs/automations/docs-propagation.md b/docs/automations/docs-propagation.md index e8eba08d3640e..9f38902894191 100644 --- a/docs/automations/docs-propagation.md +++ b/docs/automations/docs-propagation.md @@ -26,7 +26,6 @@ This feature is enabled by default in Open Source DataHub. | Column-Level Docs Propagation | ✔️ | ✔️ | | Asset-Level Docs Propagation | ✔️ | ✔️ | | Downstream Lineage + Siblings | ✔️ | ✔️ | -| Propagation Rollback (Undo) | ❌ | ✔️ | | Historical Backfilling | ❌ | ✔️ | ## Enabling Documentation Propagation @@ -95,27 +94,6 @@ and then click "Initialize". This one-time step will kick off the back-filling process for existing descriptions. If you only want to begin propagating descriptions going forward, you can skip this step. -## Rolling Back Propagated Descriptions (DataHub Cloud Only) - -In DataHub Cloud, you can rollback all descriptions that have been propagated historically. - -This feature allows you to "clean up" or "undo" any accidental propagation that may have occurred automatically, in the case -that you no longer want propagated descriptions to be visible. - -To do this, navigate to the Automation you created in Step 3 above, click the 3-dot "More" menu - -

- -

- -and then click "Rollback". - -

- -

- -This one-time step will remove all propagated tags and glossary terms from Snowflake. To simply stop propagating new tags, you can disable the automation. - ## Viewing Propagated Descriptions Once the automation is enabled, you'll be able to recognize propagated descriptions as those with the thunderbolt icon next to them: diff --git a/docs/automations/glossary-term-propagation.md b/docs/automations/glossary-term-propagation.md new file mode 100644 index 0000000000000..90e8e75ea44ef --- /dev/null +++ b/docs/automations/glossary-term-propagation.md @@ -0,0 +1,61 @@ +# Glossary Term Propagation Automation + + + +## Introduction + +Glossary Term Propagation is an automation feature that propagates classification labels (Glossary Terms) across column and assets based on downstream lineage and sibling relationships. +This automation simplifies metadata management by ensuring consistent term classification and reducing manual effort in categorizing data assets, aiding Data Governance & Compliance, and enhancing Data Discovery. + +## Capabilities + +- **Column-Level Glossary Term Propagation**: Automatically propagate Glossary Terms to all downstream lineage columns and sibling columns. +- **Asset-Level Glossary Term Propagation**: Automatically propagate Glossary Terms to all downstream lineage assets & sibling assets. +- **Select Terms & Term Groups**: Select specific Glossary Terms & Term Groups to propagate, e.g. to propagate only sensitive or important labels. + +Note that Asset-level propagation is currently only support for **Datasets** (Tables, Views, Topics, etc), and not for other asset types including +Charts, Dashboards, Data Pipelines, Data Tasks. + +## Enabling Glossary Term Propagation + +1. **Navigate to Automations**: Go to 'Govern' > 'Automations' in the navigation bar. + +

+ +

+ + +2. **Create An Automation**: Select 'Glossary Term Propagation' from the automation types. + +

+ +

+ +3. **Configure Automation**: Complete the required fields and select 'Save and Run' to activate the automation. + +

+ +

+ +## Propagating for Existing Assets + +In DataHub Cloud, you can back-fill historical data to ensure existing Glossary Terms are consistently propagated across downstream relationships. To begin, access the Automation created in Step 3, click the 3-dot "more" menu, and choose "Initialize." This will kick off the backfill process. + +

+ +

+ +and then click "Initialize". + +

+ +

+ + +## Viewing Propagated Glossary Terms + +Once enabled, propagated Glossary Terms will display a thunderbolt icon, indicating the origin of the term and any intermediate lineage hops used in propagation. + +

+ +

\ No newline at end of file diff --git a/docs/automations/snowflake-tag-propagation.md b/docs/automations/snowflake-tag-propagation.md index c708e40cbdd81..b72224642b0f0 100644 --- a/docs/automations/snowflake-tag-propagation.md +++ b/docs/automations/snowflake-tag-propagation.md @@ -57,27 +57,6 @@ and then click "Initialize". This one-time step will kick off the back-filling process for existing descriptions. If you only want to begin propagating descriptions going forward, you can skip this step. -## Rolling Back Propagated Tags - -You can rollback all tags and glossary terms that have been propagated historically. - -This feature allows you to "clean up" or "undo" any accidental propagation that may have occurred automatically, in the case -that you no longer want propagated descriptions to be visible. - -To do this, navigate to the Automation you created in Step 3 above, click the 3-dot "More" menu - -

- -

- -and then click "Rollback". - -

- -

- -This one-time step will remove all propagated tags and glossary terms from Snowflake. To simply stop propagating new tags, you can disable the automation. - ## Viewing Propagated Tags You can view propagated Tags (and corresponding DataHub URNs) inside the Snowflake UI to confirm the automation is working as expected. diff --git a/docs/managed-datahub/release-notes/v_0_3_7.md b/docs/managed-datahub/release-notes/v_0_3_7.md index cc01ceb52c8d8..dc1c702c89fb2 100644 --- a/docs/managed-datahub/release-notes/v_0_3_7.md +++ b/docs/managed-datahub/release-notes/v_0_3_7.md @@ -19,7 +19,7 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies - Breaking Changes - Authentication & RestAPI Authorization enabled by default (since v0.3.6) - - Helm Chart Requirement: 1.4.132+ + - Helm Chart Requirement: 1.4.136+ - Recommend setting timezone for `datahub-gc` and `datahub-usage-reporting` - ```yaml acryl-datahub: diff --git a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py index 1cb736bb1ba83..2ad301a38d002 100644 --- a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py +++ b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py @@ -66,7 +66,7 @@ has_name_positional_arg = packaging.version.parse( GX_VERSION - ) >= packaging.version.Version("0.18.0") + ) >= packaging.version.Version("0.18.14") except Exception: has_name_positional_arg = False @@ -89,7 +89,7 @@ class DataHubValidationAction(ValidationAction): def __init__( self, data_context: AbstractDataContext, - # this would capture `name` positional arg added in GX 0.18.0 + # this would capture `name` positional arg added in GX 0.18.14 *args: Union[str, Any], server_url: str, env: str = builder.DEFAULT_ENV, diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b1e1d7044a339..2014d8ca4e4dd 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -573,6 +573,7 @@ test_api_requirements = { "pytest>=6.2.2", + "pytest-timeout", # Missing numpy requirement in 8.0.0 "deepdiff!=8.0.0", "PyYAML", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py index 3823c123d06cf..bce3b29130ec9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py @@ -63,9 +63,9 @@ def before_cursor_execute(conn, cursor, statement, parameters, context, executem class OracleConfig(BasicSQLAlchemyConfig): - # defaults + # TODO: Change scheme to oracle+oracledb when sqlalchemy>=2 is supported scheme: str = Field( - default="oracle+oracledb", + default="oracle", description="Will be set automatically to default value.", ) service_name: Optional[str] = Field( diff --git a/metadata-ingestion/src/datahub/utilities/partition_executor.py b/metadata-ingestion/src/datahub/utilities/partition_executor.py index aeabc5c55e6b2..0f0e9784464f6 100644 --- a/metadata-ingestion/src/datahub/utilities/partition_executor.py +++ b/metadata-ingestion/src/datahub/utilities/partition_executor.py @@ -237,6 +237,11 @@ def __init__( process_batch: Callable[[List], None], max_per_batch: int = 100, min_process_interval: timedelta = _DEFAULT_BATCHER_MIN_PROCESS_INTERVAL, + # Why 3 seconds? It's somewhat arbitrary. + # We don't want it to be too high, since then liveness suffers, + # particularly during a dirty shutdown. If it's too low, then we'll + # waste CPU cycles rechecking the timer, only to call get again. + read_from_pending_interval: timedelta = timedelta(seconds=3), ) -> None: """Similar to PartitionExecutor, but with batching. @@ -262,8 +267,10 @@ def __init__( self.max_per_batch = max_per_batch self.process_batch = process_batch self.min_process_interval = min_process_interval + self.read_from_pending_interval = read_from_pending_interval assert self.max_workers > 1 + self.state_lock = threading.Lock() self._executor = ThreadPoolExecutor( # We add one here to account for the clearinghouse worker thread. max_workers=max_workers + 1, @@ -362,12 +369,8 @@ def _build_batch() -> List[_BatchPartitionWorkItem]: if not blocking: next_item = self._pending.get_nowait() else: - # Why 3 seconds? It's somewhat arbitrary. - # We don't want it to be too high, since then liveness suffers, - # particularly during a dirty shutdown. If it's too low, then we'll - # waste CPU cycles rechecking the timer, only to call get again. next_item = self._pending.get( - timeout=3, # seconds + timeout=self.read_from_pending_interval.total_seconds(), ) if next_item is None: # None is the shutdown signal @@ -379,6 +382,9 @@ def _build_batch() -> List[_BatchPartitionWorkItem]: pending_key_completion.append(next_item) else: next_batch.append(next_item) + + if not next_batch: + next_batch = _find_ready_items() except queue.Empty: if not blocking: break @@ -452,10 +458,11 @@ def _ensure_clearinghouse_started(self) -> None: f"{self.__class__.__name__} is shutting down; cannot submit new work items." ) - # Lazily start the clearinghouse worker. - if not self._clearinghouse_started: - self._clearinghouse_started = True - self._executor.submit(self._clearinghouse_worker) + with self.state_lock: + # Lazily start the clearinghouse worker. + if not self._clearinghouse_started: + self._clearinghouse_started = True + self._executor.submit(self._clearinghouse_worker) def submit( self, diff --git a/metadata-ingestion/tests/unit/test_oracle_source.py b/metadata-ingestion/tests/unit/test_oracle_source.py index ce7952cf73a8a..0477044354576 100644 --- a/metadata-ingestion/tests/unit/test_oracle_source.py +++ b/metadata-ingestion/tests/unit/test_oracle_source.py @@ -21,7 +21,7 @@ def test_oracle_config(): ) assert ( config.get_sql_alchemy_url() - == "oracle+oracledb://user:password@host:1521/?service_name=svc01" + == "oracle://user:password@host:1521/?service_name=svc01" ) with pytest.raises(ValueError): diff --git a/metadata-ingestion/tests/unit/utilities/test_partition_executor.py b/metadata-ingestion/tests/unit/utilities/test_partition_executor.py index 2901b4bba107e..e3a68405e3c0a 100644 --- a/metadata-ingestion/tests/unit/utilities/test_partition_executor.py +++ b/metadata-ingestion/tests/unit/utilities/test_partition_executor.py @@ -1,7 +1,11 @@ import logging +import math import time from concurrent.futures import Future +import pytest +from pydantic.schema import timedelta + from datahub.utilities.partition_executor import ( BatchPartitionExecutor, PartitionExecutor, @@ -129,7 +133,9 @@ def process_batch(batch): } +@pytest.mark.timeout(10) def test_batch_partition_executor_max_batch_size(): + n = 20 # Exceed max_pending to test for deadlocks when max_pending exceeded batches_processed = [] def process_batch(batch): @@ -137,15 +143,20 @@ def process_batch(batch): time.sleep(0.1) # Simulate batch processing time with BatchPartitionExecutor( - max_workers=5, max_pending=20, process_batch=process_batch, max_per_batch=2 + max_workers=5, + max_pending=10, + process_batch=process_batch, + max_per_batch=2, + min_process_interval=timedelta(seconds=1), + read_from_pending_interval=timedelta(seconds=1), ) as executor: # Submit more tasks than the max_per_batch to test batching limits. - for i in range(5): + for i in range(n): executor.submit("key3", "key3", f"task{i}") # Check the batches. logger.info(f"batches_processed: {batches_processed}") - assert len(batches_processed) == 3 + assert len(batches_processed) == math.ceil(n / 2), "Incorrect number of batches" for batch in batches_processed: assert len(batch) <= 2, "Batch size exceeded max_per_batch limit" diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 8b625b3ae2289..fa9109689caad 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -769,8 +769,11 @@ public List batchIngestProposals( opContext.getValidationContext().isAlternateValidation()) .build(); + List results = entityService.ingestProposal(opContext, batch, async); + entitySearchService.appendRunId(opContext, results); + Map, List> resultMap = - entityService.ingestProposal(opContext, batch, async).stream() + results.stream() .collect( Collectors.groupingBy( result -> @@ -864,8 +867,7 @@ public void rollbackIngestion( private void tryIndexRunId( @Nonnull OperationContext opContext, Urn entityUrn, @Nullable SystemMetadata systemMetadata) { if (systemMetadata != null && systemMetadata.hasRunId()) { - entitySearchService.appendRunId( - opContext, entityUrn.getEntityType(), entityUrn, systemMetadata.getRunId()); + entitySearchService.appendRunId(opContext, entityUrn, systemMetadata.getRunId()); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 261ec127d5497..7f1d467dbd491 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -106,17 +106,17 @@ public void deleteDocument( @Override public void appendRunId( - @Nonnull OperationContext opContext, - @Nonnull String entityName, - @Nonnull Urn urn, - @Nullable String runId) { + @Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId) { final String docId = indexBuilders.getIndexConvention().getEntityDocumentId(urn); log.debug( - "Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId); + "Appending run id for entity name: {}, doc id: {}, run id: {}", + urn.getEntityType(), + docId, + runId); esWriteDAO.applyScriptUpdate( opContext, - entityName, + urn.getEntityType(), docId, /* Script used to apply updates to the runId field of the index. diff --git a/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl index 37e85b6e542bd..3d384bbd6b08f 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl @@ -21,6 +21,9 @@ record DataHubIngestionSourceInfo { /** * The type of the source itself, e.g. mysql, bigquery, bigquery-usage. Should match the recipe. */ + @Searchable = { + "fieldType": "TEXT_PARTIAL" + } type: string /** @@ -50,6 +53,9 @@ record DataHubIngestionSourceInfo { /** * The id of the executor to use to execute the ingestion run */ + @Searchable = { + "fieldName": "sourceExecutorId" + } executorId: optional string /** diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index eb7bb2869584b..8010ae187b6c8 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -372,6 +372,11 @@ systemUpdate: batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000} reprocess: enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false} + ingestionIndices: + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_INGESTION_INDICES_ENABLED:true} + batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_INGESTION_INDICES_BATCH_SIZE:5000} + delayMs: ${BOOTSTRAP_SYSTEM_UPDATE_INGESTION_INDICES_DELAY_MS:1000} + limit: ${BOOTSTRAP_SYSTEM_UPDATE_INGESTION_INDICES_CLL_LIMIT:0} policyFields: enabled: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_ENABLED:true} batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_BATCH_SIZE:5000} diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 676232d5b5c04..37dca1cecd817 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -22,12 +22,14 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.aspect.EnvelopedAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.entity.validation.ValidationException; +import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.resources.operations.Utils; @@ -57,6 +59,8 @@ import java.time.Clock; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -126,6 +130,11 @@ void setSystemOperationContext(OperationContext systemOperationContext) { this.systemOperationContext = systemOperationContext; } + @VisibleForTesting + void setEntitySearchService(EntitySearchService entitySearchService) { + this.entitySearchService = entitySearchService; + } + /** * Retrieves the value for an entity that is made up of latest versions of specified aspects. * TODO: Get rid of this and migrate to getAspect. @@ -320,15 +329,7 @@ private Task ingestProposals( List results = _entityService.ingestProposal(opContext, batch, asyncBool); - - for (IngestResult result : results) { - // Update runIds, only works for existing documents, so ES document must exist - Urn resultUrn = result.getUrn(); - - if (resultUrn != null && (result.isProcessedMCL() || result.isUpdate())) { - tryIndexRunId(opContext, resultUrn, result.getRequest().getSystemMetadata(), entitySearchService); - } - } + entitySearchService.appendRunId(opContext, results); // TODO: We don't actually use this return value anywhere. Maybe we should just stop returning it altogether? return RESTLI_SUCCESS; @@ -397,14 +398,4 @@ public Task restoreIndices( }, MetricRegistry.name(this.getClass(), "restoreIndices")); } - - private static void tryIndexRunId( - @Nonnull final OperationContext opContext, - final Urn urn, - final @Nullable SystemMetadata systemMetadata, - final EntitySearchService entitySearchService) { - if (systemMetadata != null && systemMetadata.hasRunId()) { - entitySearchService.appendRunId(opContext, urn.getEntityType(), urn, systemMetadata.getRunId()); - } - } } diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index 82db3d88b9e12..a39401c170a11 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -25,6 +25,7 @@ import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.GenericAspect; @@ -66,6 +67,7 @@ public void setup() { aspectResource.setEntityService(entityService); opContext = TestOperationContexts.systemContextNoSearchAuthorization(); aspectResource.setSystemOperationContext(opContext); + aspectResource.setEntitySearchService(mock(EntitySearchService.class)); entityRegistry = opContext.getEntityRegistry(); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index 26f2335d0c59f..da9ba8c684f61 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -1,15 +1,21 @@ package com.linkedin.metadata.search; import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.browse.BrowseResult; import com.linkedin.metadata.browse.BrowseResultV2; +import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.query.AutoCompleteResult; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opensearch.action.explain.ExplainResponse; @@ -59,15 +65,10 @@ void deleteDocument( /** * Appends a run id to the list for a certain document * - * @param entityName name of the entity * @param urn the urn of the user * @param runId the ID of the run */ - void appendRunId( - @Nonnull OperationContext opContext, - @Nonnull String entityName, - @Nonnull Urn urn, - @Nullable String runId); + void appendRunId(@Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId); /** * Gets a list of documents that match given search request. The results are aggregated and @@ -329,4 +330,41 @@ ExplainResponse explain( * @return convent */ IndexConvention getIndexConvention(); + + default void appendRunId( + @Nonnull final OperationContext opContext, @Nonnull List results) { + + // Only updates with runId + Map, Set> urnRunIdToBatchItem = + results.stream() + .filter(Objects::nonNull) + .filter( + result -> result.getUrn() != null && (result.isProcessedMCL() || result.isUpdate())) + .filter( + result -> + result.getRequest() != null + && result.getRequest().getSystemMetadata() != null + && result.getRequest().getSystemMetadata().hasRunId()) + .map( + result -> + Map.entry( + Pair.of( + result.getUrn(), result.getRequest().getSystemMetadata().getRunId()), + result)) + .collect( + Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(e -> e.getValue().getRequest(), Collectors.toSet()))); + + // Only update if not key aspect (document doesn't exist) + urnRunIdToBatchItem.entrySet().stream() + .filter( + entry -> + entry.getValue().stream() + .noneMatch( + item -> + item.getEntitySpec().getKeyAspectName().equals(item.getAspectName()))) + .forEach( + entry -> appendRunId(opContext, entry.getKey().getKey(), entry.getKey().getValue())); + } } diff --git a/metadata-service/services/src/test/java/com/linkedin/metadata/service/search/EntitySearchServiceTest.java b/metadata-service/services/src/test/java/com/linkedin/metadata/service/search/EntitySearchServiceTest.java new file mode 100644 index 0000000000000..41e2c2f006e94 --- /dev/null +++ b/metadata-service/services/src/test/java/com/linkedin/metadata/service/search/EntitySearchServiceTest.java @@ -0,0 +1,345 @@ +package com.linkedin.metadata.service.search; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.browse.BrowseResult; +import com.linkedin.metadata.browse.BrowseResultV2; +import com.linkedin.metadata.entity.IngestResult; +import com.linkedin.metadata.entity.UpdateAspectResult; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchResult; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.mxe.SystemMetadata; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opensearch.action.explain.ExplainResponse; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class EntitySearchServiceTest { + private static final Urn TEST_URN = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.human_profiles,PROD)"); + + private OperationContext opContext = TestOperationContexts.systemContextNoValidate(); + + private EntitySearchService testInstance; + + @BeforeClass + public void setup() { + testInstance = spy(new TestEntitySearchService()); + } + + @Test + public void testAppendRunId_EmptyList() { + List results = new ArrayList<>(); + testInstance.appendRunId(opContext, results); + // Verify no interactions since list is empty + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_NullResults() { + List results = new ArrayList<>(); + results.add(null); + testInstance.appendRunId(opContext, results); + // Verify no interactions since all results are null + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_ValidResult() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + BatchItem mockRequest = mock(BatchItem.class); + SystemMetadata mockSystemMetadata = mock(SystemMetadata.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(true); + when(result.getResult()).thenReturn(mock(UpdateAspectResult.class)); + when(result.getRequest()).thenReturn(mockRequest); + when(mockRequest.getSystemMetadata()).thenReturn(mockSystemMetadata); + when(mockSystemMetadata.hasRunId()).thenReturn(true); + when(mockSystemMetadata.getRunId()).thenReturn("test-run-id"); + when(mockRequest.getEntitySpec()) + .thenReturn(opContext.getEntityRegistry().getEntitySpec(TEST_URN.getEntityType())); + when(mockRequest.getAspectName()).thenReturn("status"); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was called with correct parameters + verify(testInstance).appendRunId(eq(opContext), eq(TEST_URN), eq("test-run-id")); + } + + @Test + public void testAppendRunId_KeyAspectMatch() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + BatchItem mockRequest = mock(BatchItem.class); + SystemMetadata mockSystemMetadata = mock(SystemMetadata.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(true); + when(result.getResult()).thenReturn(mock(UpdateAspectResult.class)); + when(result.getRequest()).thenReturn(mockRequest); + when(mockRequest.getSystemMetadata()).thenReturn(mockSystemMetadata); + when(mockSystemMetadata.hasRunId()).thenReturn(true); + when(mockSystemMetadata.getRunId()).thenReturn("test-run-id"); + when(mockRequest.getEntitySpec()) + .thenReturn(opContext.getEntityRegistry().getEntitySpec(TEST_URN.getEntityType())); + when(mockRequest.getAspectName()) + .thenReturn( + opContext + .getEntityRegistry() + .getEntitySpec(TEST_URN.getEntityType()) + .getKeyAspectName()); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was not called because aspect names match + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_NoRunId() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + BatchItem mockRequest = mock(BatchItem.class); + SystemMetadata mockSystemMetadata = mock(SystemMetadata.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(true); + when(result.getResult()).thenReturn(mock(UpdateAspectResult.class)); + when(result.getRequest()).thenReturn(mockRequest); + when(mockRequest.getSystemMetadata()).thenReturn(mockSystemMetadata); + when(mockSystemMetadata.hasRunId()).thenReturn(false); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was not called because there's no run ID + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_NotProcessedOrUpdated() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(false); + when(result.isUpdate()).thenReturn(false); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was not called because result is neither processed nor updated + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + private static class TestEntitySearchService implements EntitySearchService { + @Override + public void clear(OperationContext opContext) {} + + @Override + public long docCount(OperationContext opContext, String entityName, Filter filter) { + return 0; + } + + @Override + public void upsertDocument( + OperationContext opContext, String entityName, String document, String docId) {} + + @Override + public void deleteDocument(OperationContext opContext, String entityName, String docId) {} + + @Override + public void appendRunId(OperationContext opContext, Urn urn, String runId) {} + + @Override + public SearchResult search( + OperationContext opContext, + List entityNames, + String input, + Filter postFilters, + List sortCriteria, + int from, + int size) { + return null; + } + + @Override + public SearchResult search( + OperationContext opContext, + List entityNames, + String input, + Filter postFilters, + List sortCriteria, + int from, + int size, + List facets) { + return null; + } + + @Override + public SearchResult filter( + OperationContext opContext, + String entityName, + Filter filters, + List sortCriteria, + int from, + int size) { + return null; + } + + @Override + public AutoCompleteResult autoComplete( + OperationContext opContext, + String entityName, + String query, + String field, + Filter requestParams, + int limit) { + return null; + } + + @Override + public Map aggregateByValue( + OperationContext opContext, + List entityNames, + String field, + Filter requestParams, + int limit) { + return null; + } + + @Override + public BrowseResult browse( + OperationContext opContext, + String entityName, + String path, + Filter requestParams, + int from, + int size) { + return null; + } + + @Override + public BrowseResultV2 browseV2( + OperationContext opContext, + String entityName, + String path, + Filter filter, + String input, + int start, + int count) { + return null; + } + + @Nonnull + @Override + public BrowseResultV2 browseV2( + @Nonnull OperationContext opContext, + @Nonnull List entityNames, + @Nonnull String path, + @Nullable Filter filter, + @Nonnull String input, + int start, + int count) { + return null; + } + + @Override + public List getBrowsePaths(OperationContext opContext, String entityName, Urn urn) { + return null; + } + + @Override + public ScrollResult fullTextScroll( + OperationContext opContext, + List entities, + String input, + Filter postFilters, + List sortCriteria, + String scrollId, + String keepAlive, + int size) { + return null; + } + + @Override + public ScrollResult structuredScroll( + OperationContext opContext, + List entities, + String input, + Filter postFilters, + List sortCriteria, + String scrollId, + String keepAlive, + int size) { + return null; + } + + @Override + public int maxResultSize() { + return 0; + } + + @Override + public ExplainResponse explain( + OperationContext opContext, + String query, + String documentId, + String entityName, + Filter postFilters, + List sortCriteria, + String scrollId, + String keepAlive, + int size, + List facets) { + return null; + } + + @Override + public IndexConvention getIndexConvention() { + return null; + } + } +}