Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion-web) sorting and filtering uses api #11844

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/** Lists all ingestion sources stored within DataHub. Requires the MANAGE_INGESTION privilege. */
@Slf4j
public class ListIngestionSourcesResolver
implements DataFetcher<CompletableFuture<ListIngestionSourcesResult>> {

Expand Down Expand Up @@ -57,6 +59,22 @@ public CompletableFuture<ListIngestionSourcesResult> get(
final List<FacetFilterInput> filters =
input.getFilters() == null ? Collections.emptyList() : input.getFilters();

// construct sort criteria, defaulting to systemCreated
final SortCriterion sortCriterion;

// if query is expecting to sort by something, use that
final com.linkedin.datahub.graphql.generated.SortCriterion sortCriterionInput =
input.getSort();
if (sortCriterionInput != null) {
sortCriterion =
new SortCriterion()
.setField(sortCriterionInput.getField())
.setOrder(SortOrder.valueOf(sortCriterionInput.getSortOrder().name()));
} else {
// TODO: default to last executed
sortCriterion = null;
}

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
Expand All @@ -69,41 +87,32 @@ public CompletableFuture<ListIngestionSourcesResult> get(
Constants.INGESTION_SOURCE_ENTITY_NAME,
query,
buildFilter(filters, Collections.emptyList()),
null,
sortCriterion != null ? List.of(sortCriterion) : null,
start,
count);

final List<Urn> entitiesUrnList =
gmsResult.getEntities().stream().map(SearchEntity::getEntity).toList();
// Then, resolve all ingestion sources
final Map<Urn, EntityResponse> entities =
_entityClient.batchGetV2(
context.getOperationContext(),
Constants.INGESTION_SOURCE_ENTITY_NAME,
new HashSet<>(
gmsResult.getEntities().stream()
.map(SearchEntity::getEntity)
.collect(Collectors.toList())),
new HashSet<>(entitiesUrnList),
ImmutableSet.of(
Constants.INGESTION_INFO_ASPECT_NAME,
Constants.INGESTION_SOURCE_KEY_ASPECT_NAME));

final Collection<EntityResponse> sortedEntities =
entities.values().stream()
.sorted(
Comparator.comparingLong(
s ->
-s.getAspects()
.get(Constants.INGESTION_SOURCE_KEY_ASPECT_NAME)
.getCreated()
.getTime()))
.collect(Collectors.toList());
final List<EntityResponse> entitiesOrdered =
entitiesUrnList.stream().map(entities::get).filter(Objects::nonNull).toList();

// Now that we have entities we can bind this to a result.
final ListIngestionSourcesResult result = new ListIngestionSourcesResult();
result.setStart(gmsResult.getFrom());
result.setCount(gmsResult.getPageSize());
result.setTotal(gmsResult.getNumEntities());
result.setIngestionSources(
IngestionResolverUtils.mapIngestionSources(sortedEntities));
IngestionResolverUtils.mapIngestionSources(entitiesOrdered));
return result;

} catch (Exception e) {
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ input ListIngestionSourcesInput {
Optional Facet filters to apply to the result set
"""
filters: [FacetFilterInput!]

"""
Optional sort order. Defaults to use systemCreated.
"""
sort: SortCriterion
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class ListIngestionSourceResolverTest {

private static final ListIngestionSourcesInput TEST_INPUT =
new ListIngestionSourcesInput(0, 20, null, null);
new ListIngestionSourcesInput(0, 20, null, null, null);

@Test
public void testGetSuccess() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillIngestionSourceInfoIndicesConfig {

@Bean
public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled,
@Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize,
@Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs,
@Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) {
return new BackfillIngestionSourceInfoIndices(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.datahub.upgrade.system.ingestion;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;

public class BackfillIngestionSourceInfoIndices implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public BackfillIngestionSourceInfoIndices(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillIngestionSourceInfoIndicesStep(
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return getClass().getSimpleName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade.system.ingestion;

import static com.linkedin.metadata.Constants.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BackfillIngestionSourceInfoIndicesStep extends AbstractMCLStep {

private static final String UPGRADE_ID = BackfillIngestionSourceInfoIndices.class.getSimpleName();
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

public BackfillIngestionSourceInfoIndicesStep(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public String id() {
return UPGRADE_ID;
}

@Nonnull
@Override
protected String getAspectName() {
return INGESTION_INFO_ASPECT_NAME;
}

@Nullable
@Override
protected String getUrnLike() {
return "urn:li:" + INGESTION_SOURCE_ENTITY_NAME + ":%";
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}
}
43 changes: 26 additions & 17 deletions datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import TabToolbar from '../../entity/shared/components/styled/TabToolbar';
import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal';
import { addToListIngestionSourcesCache, CLI_EXECUTOR_ID, removeFromListIngestionSourcesCache } from './utils';
import { DEFAULT_EXECUTOR_ID, SourceBuilderState, StringMapEntryInput } from './builder/types';
import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated';
import { IngestionSource, SortCriterion, SortOrder, UpdateIngestionSourceInput } from '../../../types.generated';
import { SearchBar } from '../../search/SearchBar';
import { useEntityRegistry } from '../../useEntityRegistry';
import { ExecutionDetailsModal } from './executions/ExecutionRequestDetailsModal';
Expand Down Expand Up @@ -60,16 +60,6 @@ export enum IngestionSourceType {
CLI,
}

export function shouldIncludeSource(source: any, sourceFilter: IngestionSourceType) {
if (sourceFilter === IngestionSourceType.CLI) {
return source.config.executorId === CLI_EXECUTOR_ID;
}
if (sourceFilter === IngestionSourceType.UI) {
return source.config.executorId !== CLI_EXECUTOR_ID;
}
return true;
}

const DEFAULT_PAGE_SIZE = 25;

const removeExecutionsFromIngestionSource = (source) => {
Expand Down Expand Up @@ -105,6 +95,7 @@ export const IngestionSourceList = () => {
// Set of removed urns used to account for eventual consistency
const [removedUrns, setRemovedUrns] = useState<string[]>([]);
const [sourceFilter, setSourceFilter] = useState(IngestionSourceType.ALL);
const [sort, setSort] = useState<SortCriterion>();
const [hideSystemSources, setHideSystemSources] = useState(true);

/**
Expand All @@ -115,16 +106,24 @@ export const IngestionSourceList = () => {
// Ingestion Source Default Filters
const filters = hideSystemSources
? [{ field: 'sourceType', values: [SYSTEM_INTERNAL_SOURCE_TYPE], negated: true }]
: undefined;
: [];
if (sourceFilter !== IngestionSourceType.ALL) {
filters.push({
field: 'sourceExecutorId',
values: [CLI_EXECUTOR_ID],
negated: sourceFilter !== IngestionSourceType.CLI,
});
}

// Ingestion Source Queries
const { loading, error, data, client, refetch } = useListIngestionSourcesQuery({
variables: {
input: {
start,
count: pageSize,
query: (query?.length && query) || undefined,
filters,
query: query?.length ? query : undefined,
filters: filters.length ? filters : undefined,
sort,
},
},
fetchPolicy: (query?.length || 0) > 0 ? 'no-cache' : 'cache-first',
Expand All @@ -138,9 +137,7 @@ export const IngestionSourceList = () => {

const totalSources = data?.listIngestionSources?.total || 0;
const sources = data?.listIngestionSources?.ingestionSources || [];
const filteredSources = sources.filter(
(source) => !removedUrns.includes(source.urn) && shouldIncludeSource(source, sourceFilter),
) as IngestionSource[];
const filteredSources = sources.filter((source) => !removedUrns.includes(source.urn)) as IngestionSource[];
const focusSource =
(focusSourceUrn && filteredSources.find((source) => source.urn === focusSourceUrn)) || undefined;

Expand Down Expand Up @@ -376,6 +373,17 @@ export const IngestionSourceList = () => {
setFocusSourceUrn(undefined);
};

const onChangeSort = (field, order) => {
setSort(
order
? {
sortOrder: order === 'ascend' ? SortOrder.Ascending : SortOrder.Descending,
field,
}
: undefined,
);
};

return (
<>
{!data && loading && <Message type="loading" content="Loading ingestion sources..." />}
Expand Down Expand Up @@ -438,6 +446,7 @@ export const IngestionSourceList = () => {
onView={onView}
onDelete={onDelete}
onRefresh={onRefresh}
onChangeSort={onChangeSort}
/>
<SourcePaginationContainer>
<Pagination
Expand Down
Loading
Loading