diff --git a/buildSrc/src/main/java/io/datahubproject/GenerateJsonSchemaTask.java b/buildSrc/src/main/java/io/datahubproject/GenerateJsonSchemaTask.java index 25bf239ab835b7..1c9dfd46866102 100644 --- a/buildSrc/src/main/java/io/datahubproject/GenerateJsonSchemaTask.java +++ b/buildSrc/src/main/java/io/datahubproject/GenerateJsonSchemaTask.java @@ -183,6 +183,7 @@ private void generateSchema(final File file) { final String fileBaseName; try { final JsonNode schema = JsonLoader.fromFile(file); + final JsonNode result = buildResult(schema.toString()); String prettySchema = JacksonUtils.prettyPrint(result); Path absolutePath = file.getAbsoluteFile().toPath(); @@ -195,11 +196,21 @@ private void generateSchema(final File file) { } else { fileBaseName = getBaseName(file.getName()); } - Files.write(Paths.get(jsonDirectory + sep + fileBaseName + ".json"), + + final String targetName; + if (schema.has("Aspect") && schema.get("Aspect").has("name") && + !schema.get("Aspect").get("name").asText().equalsIgnoreCase(fileBaseName)) { + targetName = OpenApiEntities.toUpperFirst(schema.get("Aspect").get("name").asText()); + prettySchema = prettySchema.replaceAll(fileBaseName, targetName); + } else { + targetName = fileBaseName; + } + + Files.write(Paths.get(jsonDirectory + sep + targetName + ".json"), prettySchema.getBytes(StandardCharsets.UTF_8), StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); if (schema.has("Aspect")) { - aspectType.add(NODE_FACTORY.objectNode().put("$ref", "#/definitions/" + getBaseName(file.getName()))); + aspectType.add(NODE_FACTORY.objectNode().put("$ref", "#/definitions/" + targetName)); } } catch (IOException | ProcessingException e) { throw new RuntimeException(e); diff --git a/buildSrc/src/main/java/io/datahubproject/OpenApiEntities.java b/buildSrc/src/main/java/io/datahubproject/OpenApiEntities.java index 888c4a0e999311..ed94a7d5db113c 100644 --- a/buildSrc/src/main/java/io/datahubproject/OpenApiEntities.java +++ b/buildSrc/src/main/java/io/datahubproject/OpenApiEntities.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.linkedin.metadata.models.registry.config.Entities; import com.linkedin.metadata.models.registry.config.Entity; @@ -58,8 +59,12 @@ public class OpenApiEntities { .add("notebookInfo").add("editableNotebookProperties") .add("dataProductProperties") .add("institutionalMemory") + .add("forms").add("formInfo").add("dynamicFormAssignment") .build(); + private final static ImmutableSet ENTITY_EXCLUSIONS = ImmutableSet.builder() + .add("structuredProperty") + .build(); public OpenApiEntities(JsonNodeFactory NODE_FACTORY) { this.NODE_FACTORY = NODE_FACTORY; @@ -117,14 +122,19 @@ public ObjectNode entityExtension(List nodesList, ObjectNode schemas return componentsNode; } - private static String toUpperFirst(String s) { - return s.substring(0, 1).toUpperCase() + s.substring(1); + public static String toUpperFirst(String s) { + if (s.length() > 2 && s.substring(2, 3).equals(s.substring(2, 3).toUpperCase())) { + return s.substring(0, 2).toUpperCase() + s.substring(2); + } else { + return s.substring(0, 1).toUpperCase() + s.substring(1); + } } private Set withEntitySchema(ObjectNode schemasNode, Set definitions) { return entityMap.values().stream() // Make sure the primary key is defined .filter(entity -> definitions.contains(toUpperFirst(entity.getKeyAspect()))) + .filter(entity -> !ENTITY_EXCLUSIONS.contains(entity.getName())) .map(entity -> { final String upperName = toUpperFirst(entity.getName()); @@ -547,7 +557,7 @@ private ObjectNode buildSingleEntityAspectPath(Entity entity, String aspect) { ObjectNode getMethod = NODE_FACTORY.objectNode() .put("summary", String.format("Get %s for %s.", aspect, entity.getName())) - .put("operationId", String.format("get%s", upperFirstAspect, upperFirstEntity)); + .put("operationId", String.format("get%s", upperFirstAspect)); getMethod.set("tags", tagsNode); ArrayNode singlePathParametersNode = NODE_FACTORY.arrayNode(); getMethod.set("parameters", singlePathParametersNode); @@ -575,13 +585,13 @@ private ObjectNode buildSingleEntityAspectPath(Entity entity, String aspect) { .set("application/json", NODE_FACTORY.objectNode()))); ObjectNode headMethod = NODE_FACTORY.objectNode() .put("summary", String.format("%s on %s existence.", aspect, upperFirstEntity)) - .put("operationId", String.format("head%s", upperFirstAspect, upperFirstEntity)) + .put("operationId", String.format("head%s", upperFirstAspect)) .set("responses", headResponses); headMethod.set("tags", tagsNode); ObjectNode deleteMethod = NODE_FACTORY.objectNode() .put("summary", String.format("Delete %s on entity %s", aspect, upperFirstEntity)) - .put("operationId", String.format("delete%s", upperFirstAspect, upperFirstEntity)) + .put("operationId", String.format("delete%s", upperFirstAspect)) .set("responses", NODE_FACTORY.objectNode() .set("200", NODE_FACTORY.objectNode() .put("description", String.format("Delete %s on %s entity.", aspect, upperFirstEntity)) @@ -591,7 +601,7 @@ private ObjectNode buildSingleEntityAspectPath(Entity entity, String aspect) { ObjectNode postMethod = NODE_FACTORY.objectNode() .put("summary", String.format("Create aspect %s on %s ", aspect, upperFirstEntity)) - .put("operationId", String.format("create%s", upperFirstAspect, upperFirstEntity)); + .put("operationId", String.format("create%s", upperFirstAspect)); postMethod.set("requestBody", NODE_FACTORY.objectNode() .put("description", String.format("Create aspect %s on %s entity.", aspect, upperFirstEntity)) .put("required", true).set("content", NODE_FACTORY.objectNode() diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml index e7f48dc15b9ac6..769bce3105a7f1 100644 --- a/docker/profiles/docker-compose.gms.yml +++ b/docker/profiles/docker-compose.gms.yml @@ -101,6 +101,8 @@ x-datahub-gms-service: &datahub-gms-service timeout: 5s volumes: - ${HOME}/.datahub/plugins:/etc/datahub/plugins + labels: + io.datahubproject.datahub.component: "gms" x-datahub-gms-service-dev: &datahub-gms-service-dev <<: *datahub-gms-service diff --git a/docs/api/openapi/openapi-structured-properties.md b/docs/api/openapi/openapi-structured-properties.md new file mode 100644 index 00000000000000..521ce8789db0d4 --- /dev/null +++ b/docs/api/openapi/openapi-structured-properties.md @@ -0,0 +1,284 @@ +# Structured Properties - DataHub OpenAPI v2 Guide + +This guides walks through the process of creating and using a Structured Property using the `v2` version +of the DataHub OpenAPI implementation. Note that this refers to DataHub's OpenAPI version and not the version of OpenAPI itself. + +Requirements: +* curl +* jq + +## Structured Property Definition + +Before a structured property can be added to an entity it must first be defined. Here is an example +structured property being created against a local quickstart instance. + +### Create Property Definition + +Example Request: + +```shell +curl -X 'POST' -v \ + 'http://localhost:8080/openapi/v2/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Amy.test.MyProperty01/propertyDefinition' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "qualifiedName": "my.test.MyProperty01", + "displayName": "MyProperty01", + "valueType": "urn:li:dataType:datahub.string", + "allowedValues": [ + { + "value": {"string": "foo"}, + "description": "test foo value" + }, + { + "value": {"string": "bar"}, + "description": "test bar value" + } + ], + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "description": "test description" +}' | jq +``` + +### Read Property Definition + +Example Request: + +```shell +curl -X 'GET' -v \ + 'http://localhost:8080/openapi/v2/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Amy.test.MyProperty01/propertyDefinition' \ + -H 'accept: application/json' | jq +``` + +Example Response: + +```json +{ + "value": { + "allowedValues": [ + { + "value": { + "string": "foo" + }, + "description": "test foo value" + }, + { + "value": { + "string": "bar" + }, + "description": "test bar value" + } + ], + "qualifiedName": "my.test.MyProperty01", + "displayName": "MyProperty01", + "valueType": "urn:li:dataType:datahub.string", + "description": "test description", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "cardinality": "SINGLE" + } +} +``` + +### Delete Property Definition + +⚠ **Not Implemented** ⚠ + +## Applying Structured Properties + +Structured Properties can now be added to entities which have the `structuredProperties` as aspect. In the following +example we'll attach and remove properties to an example dataset entity with urn `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)`. + +### Set Structured Property Values + +This will set/replace all structured properties on the entity. See `PATCH` operations to add/remove a single property. + +```shell +curl -X 'POST' -v \ + 'http://localhost:8080/openapi/v2/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "properties": [ + { + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty01", + "values": [ + {"string": "foo"} + ] + } + ] +}' | jq +``` + +### Patch Structured Property Value + +For this example, we'll extend create a second structured property and apply both properties to the same +dataset used previously. After this your system should include both `my.test.MyProperty01` and `my.test.MyProperty02`. + +```shell +curl -X 'POST' -v \ + 'http://localhost:8080/openapi/v2/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Amy.test.MyProperty02/propertyDefinition' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "qualifiedName": "my.test.MyProperty02", + "displayName": "MyProperty02", + "valueType": "urn:li:dataType:datahub.string", + "allowedValues": [ + { + "value": {"string": "foo2"}, + "description": "test foo2 value" + }, + { + "value": {"string": "bar2"}, + "description": "test bar2 value" + } + ], + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ] +}' | jq +``` + +This command will attach one of each of the two properties to our test dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)`. + +```shell +curl -X 'POST' -v \ + 'http://localhost:8080/openapi/v2/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "properties": [ + { + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty01", + "values": [ + {"string": "foo"} + ] + }, + { + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty02", + "values": [ + {"string": "bar2"} + ] + } + ] +}' | jq +``` + +#### Remove Structured Property Value + +The expected state of our test dataset include 2 structured properties. We'd like to remove the first one and preserve +the second property. + +```shell +curl -X 'PATCH' -v \ + 'http://localhost:8080/openapi/v2/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json-patch+json' \ + -d '{ + "patch": [ + { + "op": "remove", + "path": "/properties/urn:li:structuredProperty:my.test.MyProperty01" + } + ], + "arrayPrimaryKeys": { + "properties": [ + "propertyUrn" + ] + } + }' | jq +``` + +The response will show that the expected property has been removed. + +```json +{ + "urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "aspects": { + "structuredProperties": { + "value": { + "properties": [ + { + "values": [ + { + "string": "bar2" + } + ], + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty02" + } + ] + } + } + } +} +``` + +#### Add Structured Property Value + +In this example, we'll add the property back with a different value, preserving the existing property. + +```shell +curl -X 'PATCH' -v \ + 'http://localhost:8080/openapi/v2/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json-patch+json' \ + -d '{ + "patch": [ + { + "op": "add", + "path": "/properties/urn:li:structuredProperty:my.test.MyProperty01", + "value": { + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty01", + "values": [ + { + "string": "bar" + } + ] + } + } + ], + "arrayPrimaryKeys": { + "properties": [ + "propertyUrn" + ] + } + }' | jq +``` + +The response shows that the property was re-added with the new value `bar` instead of the previous value `foo`. + +```json +{ + "urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "aspects": { + "structuredProperties": { + "value": { + "properties": [ + { + "values": [ + { + "string": "bar2" + } + ], + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty02" + }, + { + "values": [ + { + "string": "bar" + } + ], + "propertyUrn": "urn:li:structuredProperty:my.test.MyProperty01" + } + ] + } + } + } +} +``` diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/GenericJsonPatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/GenericJsonPatch.java new file mode 100644 index 00000000000000..c73ccbb2d93e33 --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/GenericJsonPatch.java @@ -0,0 +1,34 @@ +package com.linkedin.metadata.aspect.patch; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.JsonNode; +import com.github.fge.jsonpatch.JsonPatch; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class GenericJsonPatch { + @Nullable private Map> arrayPrimaryKeys; + + @Nonnull private JsonNode patch; + + @Nonnull + public Map> getArrayPrimaryKeys() { + return arrayPrimaryKeys == null ? Map.of() : arrayPrimaryKeys; + } + + @JsonIgnore + public JsonPatch getJsonPatch() throws IOException { + return JsonPatch.fromJson(patch); + } +} diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/common/GenericPatchTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/common/GenericPatchTemplate.java new file mode 100644 index 00000000000000..3a3e3c99f25a38 --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/common/GenericPatchTemplate.java @@ -0,0 +1,59 @@ +package com.linkedin.metadata.aspect.patch.template.common; + +import com.fasterxml.jackson.databind.JsonNode; +import com.github.fge.jsonpatch.JsonPatchException; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.template.CompoundKeyTemplate; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import lombok.Builder; + +@Builder +public class GenericPatchTemplate extends CompoundKeyTemplate { + + @Nonnull private final GenericJsonPatch genericJsonPatch; + @Nonnull private final Class templateType; + @Nonnull private final T templateDefault; + + @Nonnull + @Override + public Class getTemplateType() { + return templateType; + } + + @Nonnull + @Override + public T getDefault() { + return templateDefault; + } + + @Nonnull + @Override + public JsonNode transformFields(final JsonNode baseNode) { + JsonNode transformedNode = baseNode; + for (Map.Entry> composite : + genericJsonPatch.getArrayPrimaryKeys().entrySet()) { + transformedNode = arrayFieldToMap(transformedNode, composite.getKey(), composite.getValue()); + } + return transformedNode; + } + + @Nonnull + @Override + public JsonNode rebaseFields(JsonNode patched) { + JsonNode transformedNode = patched; + for (Map.Entry> composite : + genericJsonPatch.getArrayPrimaryKeys().entrySet()) { + transformedNode = + transformedMapToArray(transformedNode, composite.getKey(), composite.getValue()); + } + return transformedNode; + } + + public T applyPatch(RecordTemplate recordTemplate) throws IOException, JsonPatchException { + return super.applyPatch(recordTemplate, genericJsonPatch.getJsonPatch()); + } +} diff --git a/metadata-ingestion/src/datahub/cli/docker_check.py b/metadata-ingestion/src/datahub/cli/docker_check.py index 97b88cbc8b8ebd..47b89af6dfd040 100644 --- a/metadata-ingestion/src/datahub/cli/docker_check.py +++ b/metadata-ingestion/src/datahub/cli/docker_check.py @@ -193,6 +193,11 @@ def check_docker_quickstart() -> QuickstartStatus: .labels.get("com.docker.compose.project.config_files") .split(",") ) + + # If using profiles, alternative check + if config_files and "/profiles/" in config_files[0]: + return check_docker_quickstart_profiles(client) + all_containers = set() for config_file in config_files: with open(config_file, "r") as config_file: @@ -234,3 +239,35 @@ def check_docker_quickstart() -> QuickstartStatus: ) return QuickstartStatus(container_statuses) + + +def check_docker_quickstart_profiles(client: docker.DockerClient) -> QuickstartStatus: + container_statuses: List[DockerContainerStatus] = [] + containers = client.containers.list( + all=True, + filters={"label": "io.datahubproject.datahub.component=gms"}, + # We can get race conditions between docker running up / recreating + # containers and our status checks. + ignore_removed=True, + ) + if len(containers) == 0: + return QuickstartStatus([]) + + existing_containers = set() + # Check that the containers are running and healthy. + container: docker.models.containers.Container + for container in containers: + name = container.labels.get("com.docker.compose.service", container.name) + existing_containers.add(name) + status = ContainerStatus.OK + if container.status != "running": + status = ContainerStatus.DIED + elif "Health" in container.attrs["State"]: + if container.attrs["State"]["Health"]["Status"] == "starting": + status = ContainerStatus.STARTING + elif container.attrs["State"]["Health"]["Status"] != "healthy": + status = ContainerStatus.UNHEALTHY + + container_statuses.append(DockerContainerStatus(name, status)) + + return QuickstartStatus(container_statuses) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java index 6426a368a95b7d..89209c44f10c77 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java @@ -4,12 +4,14 @@ import static com.linkedin.metadata.entity.AspectUtils.validateAspect; import com.datahub.util.exception.ModelConversionException; +import com.github.fge.jsonpatch.JsonPatchException; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.batch.SystemAspect; import com.linkedin.metadata.aspect.batch.UpsertItem; +import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; import com.linkedin.metadata.aspect.plugins.validation.AspectRetriever; @@ -24,6 +26,7 @@ import com.linkedin.metadata.utils.SystemMetadataUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; +import java.io.IOException; import java.sql.Timestamp; import java.util.Objects; import javax.annotation.Nonnull; @@ -38,6 +41,31 @@ @Builder(toBuilder = true) public class MCPUpsertBatchItem extends UpsertItem { + public static MCPUpsertBatchItem fromPatch( + @Nonnull Urn urn, + @Nonnull AspectSpec aspectSpec, + @Nullable RecordTemplate recordTemplate, + GenericPatchTemplate genericPatchTemplate, + @Nonnull AuditStamp auditStamp, + AspectRetriever aspectRetriever) { + MCPUpsertBatchItem.MCPUpsertBatchItemBuilder builder = + MCPUpsertBatchItem.builder() + .urn(urn) + .auditStamp(auditStamp) + .aspectName(aspectSpec.getName()); + + RecordTemplate currentValue = + recordTemplate != null ? recordTemplate : genericPatchTemplate.getDefault(); + + try { + builder.aspect(genericPatchTemplate.applyPatch(currentValue)); + } catch (JsonPatchException | IOException e) { + throw new RuntimeException(e); + } + + return builder.build(aspectRetriever); + } + // urn an urn associated with the new aspect @Nonnull private final Urn urn; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java index 0d8b7655fddeb1..24e272dee7a254 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java @@ -10,6 +10,7 @@ import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.RelatedEntitiesResult; +import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.query.filter.Criterion; @@ -17,6 +18,7 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; import io.dgraph.DgraphClient; import io.dgraph.DgraphProto.Mutation; import io.dgraph.DgraphProto.NQuad; @@ -779,4 +781,21 @@ public void clear() { // setup urn, type and key relationships getSchema(); } + + @Nonnull + @Override + public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nullable List sourceTypes, + @Nonnull Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nonnull Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { + throw new IllegalArgumentException("Not implemented"); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 97cb186ce948ce..3051319aa54cf3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -23,6 +23,8 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.search.elasticsearch.query.request.SearchAfterWrapper; import com.linkedin.metadata.search.utils.ESUtils; import com.linkedin.metadata.utils.ConcurrencyUtils; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; @@ -81,7 +83,7 @@ public class ESGraphQueryDAO { @Nonnull public static void addFilterToQueryBuilder( - @Nonnull Filter filter, String node, BoolQueryBuilder rootQuery) { + @Nonnull Filter filter, @Nullable String node, BoolQueryBuilder rootQuery) { BoolQueryBuilder orQuery = new BoolQueryBuilder(); for (ConjunctiveCriterion conjunction : filter.getOr()) { final BoolQueryBuilder andQuery = new BoolQueryBuilder(); @@ -93,12 +95,13 @@ public static void addFilterToQueryBuilder( } criterionArray.forEach( criterion -> - andQuery.must( + andQuery.filter( QueryBuilders.termQuery( - node + "." + criterion.getField(), criterion.getValue()))); + (node == null ? "" : node + ".") + criterion.getField(), + criterion.getValue()))); orQuery.should(andQuery); } - rootQuery.must(orQuery); + rootQuery.filter(orQuery); } private SearchResponse executeSearchQuery( @@ -174,9 +177,9 @@ public SearchResponse getSearchResponse( public static BoolQueryBuilder buildQuery( @Nullable final List sourceTypes, - @Nonnull final Filter sourceEntityFilter, + @Nullable final Filter sourceEntityFilter, @Nullable final List destinationTypes, - @Nonnull final Filter destinationEntityFilter, + @Nullable final Filter destinationEntityFilter, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) { BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); @@ -187,17 +190,22 @@ public static BoolQueryBuilder buildQuery( String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? SOURCE : DESTINATION; if (sourceTypes != null && sourceTypes.size() > 0) { - finalQuery.must(QueryBuilders.termsQuery(sourceNode + ".entityType", sourceTypes)); + finalQuery.filter(QueryBuilders.termsQuery(sourceNode + ".entityType", sourceTypes)); + } + if (sourceEntityFilter != null) { + addFilterToQueryBuilder(sourceEntityFilter, sourceNode, finalQuery); } - addFilterToQueryBuilder(sourceEntityFilter, sourceNode, finalQuery); // set destination filter String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? DESTINATION : SOURCE; if (destinationTypes != null && destinationTypes.size() > 0) { - finalQuery.must(QueryBuilders.termsQuery(destinationNode + ".entityType", destinationTypes)); + finalQuery.filter( + QueryBuilders.termsQuery(destinationNode + ".entityType", destinationTypes)); + } + if (destinationEntityFilter != null) { + addFilterToQueryBuilder(destinationEntityFilter, destinationNode, finalQuery); } - addFilterToQueryBuilder(destinationEntityFilter, destinationNode, finalQuery); // set relationship filter if (relationshipTypes.size() > 0) { @@ -206,8 +214,14 @@ public static BoolQueryBuilder buildQuery( relationshipType -> relationshipQuery.should( QueryBuilders.termQuery(RELATIONSHIP_TYPE, relationshipType))); - finalQuery.must(relationshipQuery); + finalQuery.filter(relationshipQuery); + } + + // general filter + if (relationshipFilter.getOr() != null) { + addFilterToQueryBuilder(new Filter().setOr(relationshipFilter.getOr()), null, finalQuery); } + return finalQuery; } @@ -659,4 +673,60 @@ public static class LineageResponse { int total; List lineageRelationships; } + + public SearchResponse getSearchResponse( + @Nullable final List sourceTypes, + @Nullable final Filter sourceEntityFilter, + @Nullable final List destinationTypes, + @Nullable final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count) { + + BoolQueryBuilder finalQuery = + buildQuery( + sourceTypes, + sourceEntityFilter, + destinationTypes, + destinationEntityFilter, + relationshipTypes, + relationshipFilter); + + return executeScrollSearchQuery(finalQuery, sortCriterion, scrollId, count); + } + + private SearchResponse executeScrollSearchQuery( + @Nonnull final QueryBuilder query, + @Nonnull List sortCriterion, + @Nullable String scrollId, + final int count) { + + Object[] sort = null; + if (scrollId != null) { + SearchAfterWrapper searchAfterWrapper = SearchAfterWrapper.fromScrollId(scrollId); + sort = searchAfterWrapper.getSort(); + } + + SearchRequest searchRequest = new SearchRequest(); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchSourceBuilder.size(count); + searchSourceBuilder.query(query); + ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, List.of(), false); + searchRequest.source(searchSourceBuilder); + ESUtils.setSearchAfter(searchSourceBuilder, sort, null, null); + + searchRequest.indices(indexConvention.getIndexName(INDEX_NAME)); + + try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "esQuery").time()) { + MetricUtils.counter(this.getClass(), SEARCH_EXECUTIONS_METRIC).inc(); + return client.search(searchRequest, RequestOptions.DEFAULT); + } catch (Exception e) { + log.error("Search query failed", e); + throw new ESQueryException("Search query failed:", e); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 31e87d6ebfca44..67590ffd6e7c10 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -11,7 +11,9 @@ import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.LineageDirection; import com.linkedin.metadata.graph.LineageRelationshipArray; +import com.linkedin.metadata.graph.RelatedEntities; import com.linkedin.metadata.graph.RelatedEntitiesResult; +import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.query.filter.Condition; @@ -22,8 +24,10 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig; +import com.linkedin.metadata.search.elasticsearch.query.request.SearchAfterWrapper; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.shared.ElasticSearchIndexed; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; @@ -49,6 +53,7 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchResponse; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; @Slf4j @RequiredArgsConstructor @@ -167,8 +172,6 @@ public RelatedEntitiesResult findRelatedEntities( } final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - String destinationNode = - relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source"; SearchResponse response = _graphReadDAO.getSearchResponse( @@ -187,28 +190,8 @@ public RelatedEntitiesResult findRelatedEntities( int totalCount = (int) response.getHits().getTotalHits().value; final List relationships = - Arrays.stream(response.getHits().getHits()) - .map( - hit -> { - final String urnStr = - ((HashMap) - hit.getSourceAsMap().getOrDefault(destinationNode, EMPTY_HASH)) - .getOrDefault("urn", null); - final String relationshipType = - (String) hit.getSourceAsMap().get("relationshipType"); - - if (urnStr == null || relationshipType == null) { - log.error( - String.format( - "Found null urn string, relationship type, aspect name or path spec in Elastic index. " - + "urnStr: %s, relationshipType: %s", - urnStr, relationshipType)); - return null; - } - - return new RelatedEntity(relationshipType, urnStr); - }) - .filter(Objects::nonNull) + searchHitsToRelatedEntities(response.getHits().getHits(), relationshipDirection).stream() + .map(RelatedEntities::asRelatedEntity) .collect(Collectors.toList()); return new RelatedEntitiesResult(offset, relationships.size(), totalCount, relationships); @@ -352,4 +335,88 @@ public void clear() { public boolean supportsMultiHop() { return true; } + + @Nonnull + @Override + public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nullable List sourceTypes, + @Nullable Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nullable Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { + + final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + + SearchResponse response = + _graphReadDAO.getSearchResponse( + sourceTypes, + sourceEntityFilter, + destinationTypes, + destinationEntityFilter, + relationshipTypes, + relationshipFilter, + sortCriterion, + scrollId, + count); + + if (response == null) { + return new RelatedEntitiesScrollResult(0, 0, null, ImmutableList.of()); + } + + int totalCount = (int) response.getHits().getTotalHits().value; + final List relationships = + searchHitsToRelatedEntities(response.getHits().getHits(), relationshipDirection); + + SearchHit[] searchHits = response.getHits().getHits(); + // Only return next scroll ID if there are more results, indicated by full size results + String nextScrollId = null; + if (searchHits.length == count) { + Object[] sort = searchHits[searchHits.length - 1].getSortValues(); + nextScrollId = new SearchAfterWrapper(sort, null, 0L).toScrollId(); + } + + return RelatedEntitiesScrollResult.builder() + .entities(relationships) + .pageSize(relationships.size()) + .numResults(totalCount) + .scrollId(nextScrollId) + .build(); + } + + private static List searchHitsToRelatedEntities( + SearchHit[] searchHits, RelationshipDirection relationshipDirection) { + return Arrays.stream(searchHits) + .map( + hit -> { + final String destinationUrnStr = + ((HashMap) + hit.getSourceAsMap().getOrDefault("destination", EMPTY_HASH)) + .getOrDefault("urn", null); + final String sourceUrnStr = + ((HashMap) + hit.getSourceAsMap().getOrDefault("source", EMPTY_HASH)) + .getOrDefault("urn", null); + final String relationshipType = (String) hit.getSourceAsMap().get("relationshipType"); + + if (destinationUrnStr == null || sourceUrnStr == null || relationshipType == null) { + log.error( + String.format( + "Found null urn string, relationship type, aspect name or path spec in Elastic index. " + + "destinationUrnStr: %s, sourceUrnStr: %s, relationshipType: %s", + destinationUrnStr, sourceUrnStr, relationshipType)); + return null; + } + + return new RelatedEntities( + relationshipType, sourceUrnStr, destinationUrnStr, relationshipDirection); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index c8d3147711eba5..a1f73a134ec8ef 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -17,6 +17,7 @@ import com.linkedin.metadata.graph.LineageRelationship; import com.linkedin.metadata.graph.LineageRelationshipArray; import com.linkedin.metadata.graph.RelatedEntitiesResult; +import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.query.filter.Condition; @@ -25,6 +26,7 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.util.Pair; import io.opentelemetry.extension.annotations.WithSpan; @@ -882,4 +884,21 @@ private boolean isSourceDestReversed( return null; } } + + @Nonnull + @Override + public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nullable List sourceTypes, + @Nonnull Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nonnull Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { + throw new IllegalArgumentException("Not implemented"); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java index 1fbebbd6b34c0f..242300b608833a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java @@ -267,46 +267,71 @@ public static void buildSortOrder( @Nonnull SearchSourceBuilder searchSourceBuilder, @Nullable SortCriterion sortCriterion, List entitySpecs) { - if (sortCriterion == null) { + buildSortOrder( + searchSourceBuilder, + sortCriterion == null ? List.of() : List.of(sortCriterion), + entitySpecs, + true); + } + + /** + * Allow disabling default sort, used when you know uniqueness is present without urn field. For + * example, edge indices where the unique constraint is determined by multiple fields (src urn, + * dst urn, relation type). + * + * @param enableDefaultSort enable/disable default sorting logic + */ + public static void buildSortOrder( + @Nonnull SearchSourceBuilder searchSourceBuilder, + @Nonnull List sortCriterion, + List entitySpecs, + boolean enableDefaultSort) { + if (sortCriterion.isEmpty() && enableDefaultSort) { searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); } else { - Optional fieldTypeForDefault = Optional.empty(); - for (EntitySpec entitySpec : entitySpecs) { - List fieldSpecs = entitySpec.getSearchableFieldSpecs(); - for (SearchableFieldSpec fieldSpec : fieldSpecs) { - SearchableAnnotation annotation = fieldSpec.getSearchableAnnotation(); - if (annotation.getFieldName().equals(sortCriterion.getField()) - || annotation.getFieldNameAliases().contains(sortCriterion.getField())) { - fieldTypeForDefault = Optional.of(fieldSpec.getSearchableAnnotation().getFieldType()); - break; - } - } - if (fieldTypeForDefault.isPresent()) { - break; - } - } - if (fieldTypeForDefault.isEmpty()) { - log.warn( - "Sort criterion field " - + sortCriterion.getField() - + " was not found in any entity spec to be searched"); - } - final SortOrder esSortOrder = - (sortCriterion.getOrder() == com.linkedin.metadata.query.filter.SortOrder.ASCENDING) - ? SortOrder.ASC - : SortOrder.DESC; - FieldSortBuilder sortBuilder = - new FieldSortBuilder(sortCriterion.getField()).order(esSortOrder); - if (fieldTypeForDefault.isPresent()) { - String esFieldtype = getElasticTypeForFieldType(fieldTypeForDefault.get()); - if (esFieldtype != null) { - sortBuilder.unmappedType(esFieldtype); - } - } - searchSourceBuilder.sort(sortBuilder); + sortCriterion.forEach( + sortCriteria -> { + Optional fieldTypeForDefault = Optional.empty(); + for (EntitySpec entitySpec : entitySpecs) { + List fieldSpecs = entitySpec.getSearchableFieldSpecs(); + for (SearchableFieldSpec fieldSpec : fieldSpecs) { + SearchableAnnotation annotation = fieldSpec.getSearchableAnnotation(); + if (annotation.getFieldName().equals(sortCriteria.getField()) + || annotation.getFieldNameAliases().contains(sortCriteria.getField())) { + fieldTypeForDefault = + Optional.of(fieldSpec.getSearchableAnnotation().getFieldType()); + break; + } + } + if (fieldTypeForDefault.isPresent()) { + break; + } + } + if (fieldTypeForDefault.isEmpty()) { + log.warn( + "Sort criterion field " + + sortCriteria.getField() + + " was not found in any entity spec to be searched"); + } + final SortOrder esSortOrder = + (sortCriteria.getOrder() == com.linkedin.metadata.query.filter.SortOrder.ASCENDING) + ? SortOrder.ASC + : SortOrder.DESC; + FieldSortBuilder sortBuilder = + new FieldSortBuilder(sortCriteria.getField()).order(esSortOrder); + if (fieldTypeForDefault.isPresent()) { + String esFieldtype = getElasticTypeForFieldType(fieldTypeForDefault.get()); + if (esFieldtype != null) { + sortBuilder.unmappedType(esFieldtype); + } + } + searchSourceBuilder.sort(sortBuilder); + }); } - if (sortCriterion == null - || !sortCriterion.getField().equals(DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD)) { + if (enableDefaultSort + && (sortCriterion.isEmpty() + || sortCriterion.stream() + .noneMatch(c -> c.getField().equals(DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD)))) { searchSourceBuilder.sort( new FieldSortBuilder(DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD).order(SortOrder.ASC)); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index a24ffc873997e4..71ffd603c999f6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -20,12 +20,15 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig; +import com.linkedin.metadata.search.elasticsearch.query.request.SearchAfterWrapper; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.search.utils.ESUtils; import com.linkedin.metadata.search.utils.QueryUtils; import com.linkedin.metadata.shared.ElasticSearchIndexed; import com.linkedin.metadata.timeseries.BatchWriteOperationsOptions; +import com.linkedin.metadata.timeseries.GenericTimeseriesDocument; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.timeseries.TimeseriesScrollResult; import com.linkedin.metadata.timeseries.elastic.indexbuilder.MappingsBuilder; import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders; import com.linkedin.metadata.timeseries.elastic.query.ESAggregatedStatsDAO; @@ -48,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -88,8 +92,6 @@ public class ElasticSearchTimeseriesAspectService .setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build()); } - private static final String TIMESTAMP_FIELD = "timestampMillis"; - private static final String EVENT_FIELD = "event"; private static final Integer DEFAULT_LIMIT = 10000; private final IndexConvention _indexConvention; @@ -120,7 +122,7 @@ public ElasticSearchTimeseriesAspectService( private static EnvelopedAspect parseDocument(@Nonnull SearchHit doc) { Map docFields = doc.getSourceAsMap(); EnvelopedAspect envelopedAspect = new EnvelopedAspect(); - Object event = docFields.get(EVENT_FIELD); + Object event = docFields.get(MappingsBuilder.EVENT_FIELD); GenericAspect genericAspect; try { genericAspect = @@ -149,6 +151,61 @@ private static EnvelopedAspect parseDocument(@Nonnull SearchHit doc) { return envelopedAspect; } + private static Set commonFields = + Set.of( + MappingsBuilder.URN_FIELD, + MappingsBuilder.RUN_ID_FIELD, + MappingsBuilder.EVENT_GRANULARITY, + MappingsBuilder.IS_EXPLODED_FIELD, + MappingsBuilder.MESSAGE_ID_FIELD, + MappingsBuilder.PARTITION_SPEC_PARTITION, + MappingsBuilder.PARTITION_SPEC, + MappingsBuilder.SYSTEM_METADATA_FIELD, + MappingsBuilder.TIMESTAMP_MILLIS_FIELD, + MappingsBuilder.TIMESTAMP_FIELD, + MappingsBuilder.EVENT_FIELD); + + private static Pair toEnvAspectGenericDocument( + @Nonnull SearchHit doc) { + EnvelopedAspect envelopedAspect = null; + + Map documentFieldMap = doc.getSourceAsMap(); + + GenericTimeseriesDocument.GenericTimeseriesDocumentBuilder builder = + GenericTimeseriesDocument.builder() + .urn((String) documentFieldMap.get(MappingsBuilder.URN_FIELD)) + .timestampMillis((Long) documentFieldMap.get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD)) + .timestamp((Long) documentFieldMap.get(MappingsBuilder.TIMESTAMP_FIELD)); + + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.RUN_ID_FIELD)) + .ifPresent(d -> builder.runId((String) d)); + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.EVENT_GRANULARITY)) + .ifPresent(d -> builder.eventGranularity((String) d)); + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.IS_EXPLODED_FIELD)) + .ifPresent(d -> builder.isExploded((Boolean) d)); + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.MESSAGE_ID_FIELD)) + .ifPresent(d -> builder.messageId((String) d)); + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.PARTITION_SPEC_PARTITION)) + .ifPresent(d -> builder.partition((String) d)); + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.PARTITION_SPEC)) + .ifPresent(d -> builder.partitionSpec(d)); + Optional.ofNullable(documentFieldMap.get(MappingsBuilder.SYSTEM_METADATA_FIELD)) + .ifPresent(d -> builder.systemMetadata(d)); + + if (documentFieldMap.get(MappingsBuilder.EVENT_FIELD) != null) { + envelopedAspect = parseDocument(doc); + builder.event(documentFieldMap.get(MappingsBuilder.EVENT_FIELD)); + } else { + // If no event, the event is any non-common field + builder.event( + documentFieldMap.entrySet().stream() + .filter(entry -> !commonFields.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + return Pair.of(envelopedAspect, builder.build()); + } + @Override public void configure() { _indexBuilders.reindexAll(); @@ -264,7 +321,7 @@ public List getAspectValues( if (startTimeMillis != null) { Criterion startTimeCriterion = new Criterion() - .setField(TIMESTAMP_FIELD) + .setField(MappingsBuilder.TIMESTAMP_MILLIS_FIELD) .setCondition(Condition.GREATER_THAN_OR_EQUAL_TO) .setValue(startTimeMillis.toString()); filterQueryBuilder.must(ESUtils.getQueryBuilderFromCriterion(startTimeCriterion, true)); @@ -272,7 +329,7 @@ public List getAspectValues( if (endTimeMillis != null) { Criterion endTimeCriterion = new Criterion() - .setField(TIMESTAMP_FIELD) + .setField(MappingsBuilder.TIMESTAMP_MILLIS_FIELD) .setCondition(Condition.LESS_THAN_OR_EQUAL_TO) .setValue(endTimeMillis.toString()); filterQueryBuilder.must(ESUtils.getQueryBuilderFromCriterion(endTimeCriterion, true)); @@ -429,4 +486,88 @@ public DeleteAspectValuesResult rollbackTimeseriesAspects(@Nonnull String runId) return rollbackResult; } + + @Nonnull + @Override + public TimeseriesScrollResult scrollAspects( + @Nonnull String entityName, + @Nonnull String aspectName, + @Nullable Filter filter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { + final BoolQueryBuilder filterQueryBuilder = + QueryBuilders.boolQuery().filter(ESUtils.buildFilterQuery(filter, true)); + + if (startTimeMillis != null) { + Criterion startTimeCriterion = + new Criterion() + .setField(MappingsBuilder.TIMESTAMP_MILLIS_FIELD) + .setCondition(Condition.GREATER_THAN_OR_EQUAL_TO) + .setValue(startTimeMillis.toString()); + filterQueryBuilder.filter(ESUtils.getQueryBuilderFromCriterion(startTimeCriterion, true)); + } + if (endTimeMillis != null) { + Criterion endTimeCriterion = + new Criterion() + .setField(MappingsBuilder.TIMESTAMP_MILLIS_FIELD) + .setCondition(Condition.LESS_THAN_OR_EQUAL_TO) + .setValue(endTimeMillis.toString()); + filterQueryBuilder.filter(ESUtils.getQueryBuilderFromCriterion(endTimeCriterion, true)); + } + + SearchResponse response = + executeScrollSearchQuery( + entityName, aspectName, filterQueryBuilder, sortCriterion, scrollId, count); + int totalCount = (int) response.getHits().getTotalHits().value; + + List> resultPairs = + Arrays.stream(response.getHits().getHits()) + .map(ElasticSearchTimeseriesAspectService::toEnvAspectGenericDocument) + .toList(); + + return TimeseriesScrollResult.builder() + .numResults(totalCount) + .pageSize(response.getHits().getHits().length) + .events(resultPairs.stream().map(Pair::getFirst).collect(Collectors.toList())) + .documents(resultPairs.stream().map(Pair::getSecond).collect(Collectors.toList())) + .build(); + } + + private SearchResponse executeScrollSearchQuery( + @Nonnull final String entityNname, + @Nonnull final String aspectName, + @Nonnull final QueryBuilder query, + @Nonnull List sortCriterion, + @Nullable String scrollId, + final int count) { + + Object[] sort = null; + if (scrollId != null) { + SearchAfterWrapper searchAfterWrapper = SearchAfterWrapper.fromScrollId(scrollId); + sort = searchAfterWrapper.getSort(); + } + + SearchRequest searchRequest = new SearchRequest(); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchSourceBuilder.size(count); + searchSourceBuilder.query(query); + ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, List.of(), false); + searchRequest.source(searchSourceBuilder); + ESUtils.setSearchAfter(searchSourceBuilder, sort, null, null); + + searchRequest.indices(_indexConvention.getTimeseriesAspectIndexName(entityNname, aspectName)); + + try (Timer.Context ignored = + MetricUtils.timer(this.getClass(), "scrollAspects_search").time()) { + return _searchClient.search(searchRequest, RequestOptions.DEFAULT); + } catch (Exception e) { + log.error("Search query failed", e); + throw new ESQueryException("Search query failed:", e); + } + } } diff --git a/metadata-models/build.gradle b/metadata-models/build.gradle index 04c90fa444f0ca..86f404adb7fef3 100644 --- a/metadata-models/build.gradle +++ b/metadata-models/build.gradle @@ -43,11 +43,10 @@ mainAvroSchemaJar.dependsOn generateAvroSchema pegasus.main.generationModes = [PegasusGenerationMode.PEGASUS, PegasusGenerationMode.AVRO] -tasks.register('generateJsonSchema', GenerateJsonSchemaTask) { +task generateJsonSchema(type: GenerateJsonSchemaTask, dependsOn: 'generateAvroSchema') { it.setInputDirectory("$projectDir/src/mainGeneratedAvroSchema") it.setOutputDirectory("$projectDir/src/generatedJsonSchema") it.setEntityRegistryYaml("${project(':metadata-models').projectDir}/src/main/resources/entity-registry.yml") - dependsOn generateAvroSchema } // https://github.com/int128/gradle-swagger-generator-plugin#task-type-generateswaggercode diff --git a/metadata-service/openapi-entity-servlet/build.gradle b/metadata-service/openapi-entity-servlet/build.gradle index fb49727fa70d1f..016ac6693f55b2 100644 --- a/metadata-service/openapi-entity-servlet/build.gradle +++ b/metadata-service/openapi-entity-servlet/build.gradle @@ -75,7 +75,7 @@ task openApiGenerate(type: GenerateSwaggerCode, dependsOn: [mergeApiComponents, 'java11' : "true", 'modelPropertyNaming': "original", 'modelPackage' : "io.datahubproject.openapi.generated", - 'apiPackage' : "io.datahubproject.openapi.generated.controller", + 'apiPackage' : "io.datahubproject.openapi.v2.generated.controller", 'delegatePattern' : "false" ] } diff --git a/metadata-service/openapi-entity-servlet/src/main/java/io/datahubproject/openapi/delegates/EntityApiDelegateImpl.java b/metadata-service/openapi-entity-servlet/src/main/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImpl.java similarity index 86% rename from metadata-service/openapi-entity-servlet/src/main/java/io/datahubproject/openapi/delegates/EntityApiDelegateImpl.java rename to metadata-service/openapi-entity-servlet/src/main/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImpl.java index 57910fd783c091..96ffc684245619 100644 --- a/metadata-service/openapi-entity-servlet/src/main/java/io/datahubproject/openapi/delegates/EntityApiDelegateImpl.java +++ b/metadata-service/openapi-entity-servlet/src/main/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImpl.java @@ -1,4 +1,4 @@ -package io.datahubproject.openapi.delegates; +package io.datahubproject.openapi.v2.delegates; import static io.datahubproject.openapi.util.ReflectionCache.toLowerFirst; @@ -35,10 +35,16 @@ import io.datahubproject.openapi.generated.DeprecationAspectResponseV2; import io.datahubproject.openapi.generated.DomainsAspectRequestV2; import io.datahubproject.openapi.generated.DomainsAspectResponseV2; +import io.datahubproject.openapi.generated.DynamicFormAssignmentAspectRequestV2; +import io.datahubproject.openapi.generated.DynamicFormAssignmentAspectResponseV2; import io.datahubproject.openapi.generated.EditableChartPropertiesAspectRequestV2; import io.datahubproject.openapi.generated.EditableChartPropertiesAspectResponseV2; import io.datahubproject.openapi.generated.EditableDatasetPropertiesAspectRequestV2; import io.datahubproject.openapi.generated.EditableDatasetPropertiesAspectResponseV2; +import io.datahubproject.openapi.generated.FormInfoAspectRequestV2; +import io.datahubproject.openapi.generated.FormInfoAspectResponseV2; +import io.datahubproject.openapi.generated.FormsAspectRequestV2; +import io.datahubproject.openapi.generated.FormsAspectResponseV2; import io.datahubproject.openapi.generated.GlobalTagsAspectRequestV2; import io.datahubproject.openapi.generated.GlobalTagsAspectResponseV2; import io.datahubproject.openapi.generated.GlossaryTermsAspectRequestV2; @@ -732,4 +738,111 @@ public ResponseEntity deleteDataProductProperties(String urn) { walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); return deleteAspect(urn, methodNameToAspectName(methodName)); } + + public ResponseEntity createForms(FormsAspectRequestV2 body, String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return createAspect( + urn, + methodNameToAspectName(methodName), + body, + FormsAspectRequestV2.class, + FormsAspectResponseV2.class); + } + + public ResponseEntity deleteForms(String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return deleteAspect(urn, methodNameToAspectName(methodName)); + } + + public ResponseEntity getForms( + String urn, @jakarta.validation.Valid Boolean systemMetadata) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return getAspect( + urn, + systemMetadata, + methodNameToAspectName(methodName), + _respClazz, + FormsAspectResponseV2.class); + } + + public ResponseEntity headForms(String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return headAspect(urn, methodNameToAspectName(methodName)); + } + + public ResponseEntity createDynamicFormAssignment( + DynamicFormAssignmentAspectRequestV2 body, String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return createAspect( + urn, + methodNameToAspectName(methodName), + body, + DynamicFormAssignmentAspectRequestV2.class, + DynamicFormAssignmentAspectResponseV2.class); + } + + public ResponseEntity createFormInfo( + FormInfoAspectRequestV2 body, String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return createAspect( + urn, + methodNameToAspectName(methodName), + body, + FormInfoAspectRequestV2.class, + FormInfoAspectResponseV2.class); + } + + public ResponseEntity deleteDynamicFormAssignment(String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return deleteAspect(urn, methodNameToAspectName(methodName)); + } + + public ResponseEntity headDynamicFormAssignment(String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return headAspect(urn, methodNameToAspectName(methodName)); + } + + public ResponseEntity headFormInfo(String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return headAspect(urn, methodNameToAspectName(methodName)); + } + + public ResponseEntity getFormInfo( + String urn, @jakarta.validation.Valid Boolean systemMetadata) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return getAspect( + urn, + systemMetadata, + methodNameToAspectName(methodName), + _respClazz, + FormInfoAspectResponseV2.class); + } + + public ResponseEntity getDynamicFormAssignment( + String urn, @jakarta.validation.Valid Boolean systemMetadata) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return getAspect( + urn, + systemMetadata, + methodNameToAspectName(methodName), + _respClazz, + DynamicFormAssignmentAspectResponseV2.class); + } + + public ResponseEntity deleteFormInfo(String urn) { + String methodName = + walker.walk(frames -> frames.findFirst().map(StackWalker.StackFrame::getMethodName)).get(); + return deleteAspect(urn, methodNameToAspectName(methodName)); + } } diff --git a/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache b/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache index 4a29b95eabc5de..7ac087f220561f 100644 --- a/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache +++ b/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache @@ -1,6 +1,6 @@ package {{package}}; -import io.datahubproject.openapi.delegates.EntityApiDelegateImpl; +import io.datahubproject.openapi.v2.delegates.EntityApiDelegateImpl; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.search.SearchService; import io.datahubproject.openapi.entities.EntitiesController; diff --git a/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/delegates/EntityApiDelegateImplTest.java b/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java similarity index 97% rename from metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/delegates/EntityApiDelegateImplTest.java rename to metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java index 1f8f0a50235133..d4217c9fd1b66c 100644 --- a/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/delegates/EntityApiDelegateImplTest.java +++ b/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java @@ -1,4 +1,4 @@ -package io.datahubproject.openapi.delegates; +package io.datahubproject.openapi.v2.delegates; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.testng.Assert.*; @@ -32,8 +32,8 @@ import io.datahubproject.openapi.generated.Status; import io.datahubproject.openapi.generated.StatusAspectRequestV2; import io.datahubproject.openapi.generated.TagAssociation; -import io.datahubproject.openapi.generated.controller.ChartApiController; -import io.datahubproject.openapi.generated.controller.DatasetApiController; +import io.datahubproject.openapi.v2.generated.controller.ChartApiController; +import io.datahubproject.openapi.v2.generated.controller.DatasetApiController; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; @@ -51,7 +51,7 @@ import org.testng.annotations.Test; @SpringBootTest(classes = {SpringWebConfig.class}) -@ComponentScan(basePackages = {"io.datahubproject.openapi.generated.controller"}) +@ComponentScan(basePackages = {"io.datahubproject.openapi.v2.generated.controller"}) @Import({OpenAPIEntityTestConfiguration.class}) @AutoConfigureMockMvc public class EntityApiDelegateImplTest extends AbstractTestNGSpringContextTests { diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/GlobalControllerExceptionHandler.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/GlobalControllerExceptionHandler.java index cc040d29657b27..f4689a98628253 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/GlobalControllerExceptionHandler.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/GlobalControllerExceptionHandler.java @@ -1,14 +1,25 @@ package io.datahubproject.openapi; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.ConversionNotSupportedException; +import org.springframework.core.Ordered; import org.springframework.core.convert.ConversionFailedException; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.servlet.mvc.support.DefaultHandlerExceptionResolver; +@Slf4j @ControllerAdvice -public class GlobalControllerExceptionHandler { - @ExceptionHandler(ConversionFailedException.class) +public class GlobalControllerExceptionHandler extends DefaultHandlerExceptionResolver { + + public GlobalControllerExceptionHandler() { + setOrder(Ordered.HIGHEST_PRECEDENCE); + setWarnLogCategory(getClass().getName()); + } + + @ExceptionHandler({ConversionFailedException.class, ConversionNotSupportedException.class}) public ResponseEntity handleConflict(RuntimeException ex) { return new ResponseEntity<>(ex.getMessage(), HttpStatus.BAD_REQUEST); } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java index 54287623299852..2336bea565e590 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java @@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.info.Info; import io.swagger.v3.oas.annotations.servers.Server; +import java.util.HashSet; import java.util.List; import java.util.Set; import org.springdoc.core.models.GroupedOpenApi; @@ -24,9 +25,20 @@ servers = {@Server(url = "/openapi/", description = "Default Server URL")}) @Configuration public class SpringWebConfig implements WebMvcConfigurer { - - public static final Set OPENAPI_PACKAGES = + private static final Set OPERATIONS_PACKAGES = Set.of("io.datahubproject.openapi.operations", "io.datahubproject.openapi.health"); + private static final Set V2_PACKAGES = Set.of("io.datahubproject.openapi.v2"); + private static final Set SCHEMA_REGISTRY_PACKAGES = + Set.of("io.datahubproject.openapi.schema.registry"); + + public static final Set NONDEFAULT_OPENAPI_PACKAGES; + + static { + NONDEFAULT_OPENAPI_PACKAGES = new HashSet<>(); + NONDEFAULT_OPENAPI_PACKAGES.addAll(OPERATIONS_PACKAGES); + NONDEFAULT_OPENAPI_PACKAGES.addAll(V2_PACKAGES); + NONDEFAULT_OPENAPI_PACKAGES.addAll(SCHEMA_REGISTRY_PACKAGES); + } @Override public void configureMessageConverters(List> messageConverters) { @@ -45,22 +57,23 @@ public void addFormatters(FormatterRegistry registry) { public GroupedOpenApi defaultOpenApiGroup() { return GroupedOpenApi.builder() .group("default") - .packagesToExclude(OPENAPI_PACKAGES.toArray(String[]::new)) + .packagesToExclude(NONDEFAULT_OPENAPI_PACKAGES.toArray(String[]::new)) .build(); } @Bean public GroupedOpenApi operationsOpenApiGroup() { - Set groupPackages = - Set.of("io.datahubproject.openapi.operations", "io.datahubproject.openapi.health"); + return GroupedOpenApi.builder() + .group("Operations") + .packagesToScan(OPERATIONS_PACKAGES.toArray(String[]::new)) + .build(); + } + @Bean + public GroupedOpenApi openApiGroupV3() { return GroupedOpenApi.builder() - .group("operations") - .packagesToScan(groupPackages.toArray(String[]::new)) - .packagesToExclude( - OPENAPI_PACKAGES.stream() - .filter(pkg -> !groupPackages.contains(pkg)) - .toArray(String[]::new)) + .group("OpenAPI v2") + .packagesToScan(V2_PACKAGES.toArray(String[]::new)) .build(); } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/ReflectionCache.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/ReflectionCache.java index 31577429df72d5..8c5d4681c45194 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/ReflectionCache.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/ReflectionCache.java @@ -135,10 +135,18 @@ public Method lookupMethod( } public static String toLowerFirst(String s) { - return s.substring(0, 1).toLowerCase() + s.substring(1); + if (s.length() > 2 && s.substring(2, 3).equals(s.substring(2, 3).toUpperCase())) { + return s.substring(0, 2).toLowerCase() + s.substring(2); + } else { + return s.substring(0, 1).toLowerCase() + s.substring(1); + } } public static String toUpperFirst(String s) { - return s.substring(0, 1).toUpperCase() + s.substring(1); + if (s.length() > 2 && s.substring(2, 3).equals(s.substring(2, 3).toUpperCase())) { + return s.substring(0, 2).toUpperCase() + s.substring(2); + } else { + return s.substring(0, 1).toUpperCase() + s.substring(1); + } } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java new file mode 100644 index 00000000000000..d512a0d9501c66 --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -0,0 +1,507 @@ +package io.datahubproject.openapi.v2.controller; + +import static io.datahubproject.openapi.v2.utils.ControllerUtil.checkAuthorized; + +import com.datahub.authentication.Actor; +import com.datahub.authentication.Authentication; +import com.datahub.authentication.AuthenticationContext; +import com.datahub.authorization.AuthorizerChain; +import com.datahub.util.RecordUtils; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.ByteString; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.metadata.aspect.batch.UpsertItem; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate; +import com.linkedin.metadata.authorization.PoliciesConfig; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.UpdateAspectResult; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; +import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.SearchFlags; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchEntityArray; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.SearchUtil; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.util.Pair; +import io.datahubproject.openapi.v2.models.GenericEntity; +import io.datahubproject.openapi.v2.models.GenericScrollResult; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.lang.reflect.InvocationTargetException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PatchMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequiredArgsConstructor +@RequestMapping("/v2/entity") +@Slf4j +public class EntityController { + private static final SearchFlags DEFAULT_SEARCH_FLAGS = + new SearchFlags().setFulltext(false).setSkipAggregates(true).setSkipHighlighting(true); + @Autowired private EntityRegistry entityRegistry; + @Autowired private SearchService searchService; + @Autowired private EntityService entityService; + @Autowired private AuthorizerChain authorizationChain; + @Autowired private boolean restApiAuthorizationEnabled; + @Autowired private ObjectMapper objectMapper; + + @Tag(name = "Generic Entities", description = "API for interacting with generic entities.") + @GetMapping(value = "/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Scroll entities") + public ResponseEntity> getEntities( + @PathVariable("entityName") String entityName, + @RequestParam(value = "aspectNames", defaultValue = "") Set aspectNames, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "query", defaultValue = "*") String query, + @RequestParam(value = "scrollId", required = false) String scrollId, + @RequestParam(value = "sort", required = false, defaultValue = "urn") String sortField, + @RequestParam(value = "sortOrder", required = false, defaultValue = "ASCENDING") + String sortOrder, + @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") + Boolean withSystemMetadata) + throws URISyntaxException { + + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + // TODO: support additional and multiple sort params + SortCriterion sortCriterion = SearchUtil.sortBy(sortField, SortOrder.valueOf(sortOrder)); + + ScrollResult result = + searchService.scrollAcrossEntities( + List.of(entitySpec.getName()), + query, + null, + sortCriterion, + scrollId, + null, + count, + DEFAULT_SEARCH_FLAGS); + + return ResponseEntity.ok( + GenericScrollResult.builder() + .results(toRecordTemplates(result.getEntities(), aspectNames, withSystemMetadata)) + .scrollId(result.getScrollId()) + .build()); + } + + @Tag(name = "Generic Entities") + @GetMapping(value = "/{entityName}/{entityUrn}", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Get an entity") + public ResponseEntity getEntity( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @RequestParam(value = "aspectNames", defaultValue = "") Set aspectNames, + @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") + Boolean withSystemMetadata) + throws URISyntaxException { + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + return ResponseEntity.of( + toRecordTemplates(List.of(UrnUtils.getUrn(entityUrn)), aspectNames, withSystemMetadata) + .stream() + .findFirst()); + } + + @Tag(name = "Generic Entities") + @RequestMapping( + value = "/{entityName}/{entityUrn}", + method = {RequestMethod.HEAD}) + @Operation(summary = "Entity exists") + public ResponseEntity headEntity( + @PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn) { + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + return exists(UrnUtils.getUrn(entityUrn), null) + ? ResponseEntity.noContent().build() + : ResponseEntity.notFound().build(); + } + + @Tag(name = "Generic Aspects", description = "API for generic aspects.") + @GetMapping( + value = "/{entityName}/{entityUrn}/{aspectName}", + produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Get an entity's generic aspect.") + public ResponseEntity getAspect( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @PathVariable("aspectName") String aspectName) + throws URISyntaxException { + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + return ResponseEntity.of( + toRecordTemplates(List.of(UrnUtils.getUrn(entityUrn)), Set.of(aspectName), true).stream() + .findFirst() + .flatMap(e -> e.getAspects().values().stream().findFirst())); + } + + @Tag(name = "Generic Aspects") + @RequestMapping( + value = "/{entityName}/{entityUrn}/{aspectName}", + method = {RequestMethod.HEAD}) + @Operation(summary = "Whether an entity aspect exists.") + public ResponseEntity headAspect( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @PathVariable("aspectName") String aspectName) { + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + return exists(UrnUtils.getUrn(entityUrn), aspectName) + ? ResponseEntity.noContent().build() + : ResponseEntity.notFound().build(); + } + + @Tag(name = "Generic Entities") + @DeleteMapping(value = "/{entityName}/{entityUrn}") + @Operation(summary = "Delete an entity") + public void deleteEntity( + @PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn) { + + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.DELETE_ENTITY_PRIVILEGE.getType())); + } + + entityService.deleteAspect(entityUrn, entitySpec.getKeyAspectName(), Map.of(), true); + } + + @Tag(name = "Generic Aspects") + @DeleteMapping(value = "/{entityName}/{entityUrn}/{aspectName}") + @Operation(summary = "Delete an entity aspect.") + public void deleteAspect( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @PathVariable("aspectName") String aspectName) { + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.DELETE_ENTITY_PRIVILEGE.getType())); + } + + entityService.deleteAspect(entityUrn, aspectName, Map.of(), true); + } + + @Tag(name = "Generic Aspects") + @PostMapping( + value = "/{entityName}/{entityUrn}/{aspectName}", + produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Create an entity aspect.") + public ResponseEntity createAspect( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @PathVariable("aspectName") String aspectName, + @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") + Boolean withSystemMetadata, + @RequestBody @Nonnull String jsonAspect) + throws URISyntaxException { + + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + Authentication authentication = AuthenticationContext.getAuthentication(); + + if (restApiAuthorizationEnabled) { + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.EDIT_ENTITY_PRIVILEGE.getType())); + } + + AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); + UpsertItem upsert = + toUpsertItem(UrnUtils.getUrn(entityUrn), aspectSpec, jsonAspect, authentication.getActor()); + + List results = + entityService.ingestAspects( + AspectsBatchImpl.builder().items(List.of(upsert)).build(), true, true); + + return ResponseEntity.of( + results.stream() + .findFirst() + .map( + result -> + GenericEntity.builder() + .urn(result.getUrn().toString()) + .build( + objectMapper, + Map.of( + aspectName, + Pair.of( + result.getNewValue(), + withSystemMetadata ? result.getNewSystemMetadata() : null))))); + } + + @Tag(name = "Generic Aspects") + @PatchMapping( + value = "/{entityName}/{entityUrn}/{aspectName}", + consumes = "application/json-patch+json", + produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Patch an entity aspect. (Experimental)") + public ResponseEntity patchAspect( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @PathVariable("aspectName") String aspectName, + @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") + Boolean withSystemMetadata, + @RequestBody @Nonnull GenericJsonPatch patch) + throws URISyntaxException, + NoSuchMethodException, + InvocationTargetException, + InstantiationException, + IllegalAccessException { + + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + Authentication authentication = AuthenticationContext.getAuthentication(); + + if (restApiAuthorizationEnabled) { + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpec, + entityUrn, + ImmutableList.of(PoliciesConfig.EDIT_ENTITY_PRIVILEGE.getType())); + } + + RecordTemplate currentValue = + entityService.getAspect(UrnUtils.getUrn(entityUrn), aspectName, 0); + + AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); + GenericPatchTemplate genericPatchTemplate = + GenericPatchTemplate.builder() + .genericJsonPatch(patch) + .templateType(aspectSpec.getDataTemplateClass()) + .templateDefault( + aspectSpec.getDataTemplateClass().getDeclaredConstructor().newInstance()) + .build(); + UpsertItem upsert = + toUpsertItem( + UrnUtils.getUrn(entityUrn), + aspectSpec, + currentValue, + genericPatchTemplate, + authentication.getActor()); + + List results = + entityService.ingestAspects( + AspectsBatchImpl.builder().items(List.of(upsert)).build(), true, true); + + return ResponseEntity.of( + results.stream() + .findFirst() + .map( + result -> + GenericEntity.builder() + .urn(result.getUrn().toString()) + .build( + objectMapper, + Map.of( + aspectName, + Pair.of( + result.getNewValue(), + withSystemMetadata ? result.getNewSystemMetadata() : null))))); + } + + private List toRecordTemplates( + SearchEntityArray searchEntities, Set aspectNames, boolean withSystemMetadata) + throws URISyntaxException { + return toRecordTemplates( + searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()), + aspectNames, + withSystemMetadata); + } + + private Boolean exists(Urn urn, @Nullable String aspect) { + return aspect == null ? entityService.exists(urn) : entityService.exists(urn, aspect); + } + + private List toRecordTemplates( + List urns, Set aspectNames, boolean withSystemMetadata) + throws URISyntaxException { + if (urns.isEmpty()) { + return List.of(); + } else { + Set urnsSet = new HashSet<>(urns); + + Map> aspects = + entityService.getLatestEnvelopedAspects( + urnsSet, resolveAspectNames(urnsSet, aspectNames)); + + return urns.stream() + .map( + u -> + GenericEntity.builder() + .urn(u.toString()) + .build( + objectMapper, + toAspectMap(u, aspects.getOrDefault(u, List.of()), withSystemMetadata))) + .collect(Collectors.toList()); + } + } + + private Set resolveAspectNames(Set urns, Set requestedNames) { + if (requestedNames.isEmpty()) { + return urns.stream() + .flatMap(u -> entityRegistry.getEntitySpec(u.getEntityType()).getAspectSpecs().stream()) + .map(AspectSpec::getName) + .collect(Collectors.toSet()); + } else { + // ensure key is always present + return Stream.concat( + requestedNames.stream(), + urns.stream() + .map(u -> entityRegistry.getEntitySpec(u.getEntityType()).getKeyAspectName())) + .collect(Collectors.toSet()); + } + } + + private Map> toAspectMap( + Urn urn, List aspects, boolean withSystemMetadata) { + return aspects.stream() + .map( + a -> + Map.entry( + a.getName(), + Pair.of( + toRecordTemplate(lookupAspectSpec(urn, a.getName()), a), + withSystemMetadata ? a.getSystemMetadata() : null))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private AspectSpec lookupAspectSpec(Urn urn, String aspectName) { + return entityRegistry.getEntitySpec(urn.getEntityType()).getAspectSpec(aspectName); + } + + private RecordTemplate toRecordTemplate(AspectSpec aspectSpec, EnvelopedAspect envelopedAspect) { + return RecordUtils.toRecordTemplate( + aspectSpec.getDataTemplateClass(), envelopedAspect.getValue().data()); + } + + private UpsertItem toUpsertItem( + Urn entityUrn, AspectSpec aspectSpec, String jsonAspect, Actor actor) + throws URISyntaxException { + return MCPUpsertBatchItem.builder() + .urn(entityUrn) + .aspectName(aspectSpec.getName()) + .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) + .aspect( + GenericRecordUtils.deserializeAspect( + ByteString.copyString(jsonAspect, StandardCharsets.UTF_8), + GenericRecordUtils.JSON, + aspectSpec)) + .build(entityService); + } + + private UpsertItem toUpsertItem( + @Nonnull Urn urn, + @Nonnull AspectSpec aspectSpec, + @Nullable RecordTemplate currentValue, + @Nonnull GenericPatchTemplate genericPatchTemplate, + @Nonnull Actor actor) + throws URISyntaxException { + return MCPUpsertBatchItem.fromPatch( + urn, + aspectSpec, + currentValue, + genericPatchTemplate, + AuditStampUtils.createAuditStamp(actor.toUrnStr()), + entityService); + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java new file mode 100644 index 00000000000000..3550a86163f51c --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java @@ -0,0 +1,228 @@ +package io.datahubproject.openapi.v2.controller; + +import static io.datahubproject.openapi.v2.utils.ControllerUtil.checkAuthorized; + +import com.datahub.authentication.Authentication; +import com.datahub.authentication.AuthenticationContext; +import com.datahub.authorization.AuthorizerChain; +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.authorization.PoliciesConfig; +import com.linkedin.metadata.graph.RelatedEntities; +import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; +import com.linkedin.metadata.search.utils.QueryUtils; +import com.linkedin.metadata.utils.SearchUtil; +import io.datahubproject.openapi.v2.models.GenericRelationship; +import io.datahubproject.openapi.v2.models.GenericScrollResult; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequiredArgsConstructor +@RequestMapping("/v2/relationship") +@Slf4j +@Tag( + name = "Generic Relationships", + description = "APIs for ingesting and accessing entity relationships.") +public class RelationshipController { + + private static final String[] SORT_FIELDS = {"source.urn", "destination.urn", "relationshipType"}; + private static final String[] SORT_ORDERS = {"ASCENDING", "ASCENDING", "ASCENDING"}; + private static final List EDGE_SORT_CRITERION; + + static { + EDGE_SORT_CRITERION = + IntStream.range(0, SORT_FIELDS.length) + .mapToObj( + idx -> SearchUtil.sortBy(SORT_FIELDS[idx], SortOrder.valueOf(SORT_ORDERS[idx]))) + .collect(Collectors.toList()); + } + + @Autowired private EntityRegistry entityRegistry; + @Autowired private ElasticSearchGraphService graphService; + @Autowired private AuthorizerChain authorizationChain; + + @Autowired private boolean restApiAuthorizationEnabled; + + /** + * Returns relationship edges by type + * + * @param relationshipType the relationship type + * @param count number of results + * @param scrollId scrolling id + * @return list of relation edges + */ + @GetMapping(value = "/{relationshipType}", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Scroll relationships of the given type.") + public ResponseEntity> getRelationshipsByType( + @PathVariable("relationshipType") String relationshipType, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "scrollId", required = false) String scrollId) { + + RelatedEntitiesScrollResult result = + graphService.scrollRelatedEntities( + null, + null, + null, + null, + List.of(relationshipType), + new RelationshipFilter().setDirection(RelationshipDirection.UNDIRECTED), + EDGE_SORT_CRITERION, + scrollId, + count, + null, + null); + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + Set entitySpecs = + result.getEntities().stream() + .flatMap( + relatedEntity -> + Stream.of( + entityRegistry.getEntitySpec( + UrnUtils.getUrn(relatedEntity.getUrn()).getEntityType()), + entityRegistry.getEntitySpec( + UrnUtils.getUrn(relatedEntity.getSourceUrn()).getEntityType()))) + .collect(Collectors.toSet()); + + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpecs, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + return ResponseEntity.ok( + GenericScrollResult.builder() + .results(toGenericRelationships(result.getEntities())) + .scrollId(result.getScrollId()) + .build()); + } + + /** + * Returns edges for a given urn + * + * @param relationshipTypes types of edges + * @param direction direction of the edges + * @param count number of results + * @param scrollId scroll id + * @return urn edges + */ + @GetMapping(value = "/{entityName}/{entityUrn}", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Scroll relationships from a given entity.") + public ResponseEntity> getRelationshipsByEntity( + @PathVariable("entityName") String entityName, + @PathVariable("entityUrn") String entityUrn, + @RequestParam(value = "relationshipType[]", required = false, defaultValue = "*") + String[] relationshipTypes, + @RequestParam(value = "direction", defaultValue = "OUTGOING") String direction, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "scrollId", required = false) String scrollId) { + + final RelatedEntitiesScrollResult result; + + switch (RelationshipDirection.valueOf(direction.toUpperCase())) { + case INCOMING -> result = + graphService.scrollRelatedEntities( + null, + null, + null, + null, + relationshipTypes.length > 0 && !relationshipTypes[0].equals("*") + ? Arrays.stream(relationshipTypes).toList() + : List.of(), + new RelationshipFilter() + .setDirection(RelationshipDirection.UNDIRECTED) + .setOr(QueryUtils.newFilter("destination.urn", entityUrn).getOr()), + EDGE_SORT_CRITERION, + scrollId, + count, + null, + null); + case OUTGOING -> result = + graphService.scrollRelatedEntities( + null, + null, + null, + null, + relationshipTypes.length > 0 && !relationshipTypes[0].equals("*") + ? Arrays.stream(relationshipTypes).toList() + : List.of(), + new RelationshipFilter() + .setDirection(RelationshipDirection.UNDIRECTED) + .setOr(QueryUtils.newFilter("source.urn", entityUrn).getOr()), + EDGE_SORT_CRITERION, + scrollId, + count, + null, + null); + default -> throw new IllegalArgumentException("Direction must be INCOMING or OUTGOING"); + } + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + Set entitySpecs = + result.getEntities().stream() + .flatMap( + relatedEntity -> + Stream.of( + entityRegistry.getEntitySpec( + UrnUtils.getUrn(relatedEntity.getDestinationUrn()).getEntityType()), + entityRegistry.getEntitySpec( + UrnUtils.getUrn(relatedEntity.getSourceUrn()).getEntityType()))) + .collect(Collectors.toSet()); + + checkAuthorized( + authorizationChain, + authentication.getActor(), + entitySpecs, + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + return ResponseEntity.ok( + GenericScrollResult.builder() + .results(toGenericRelationships(result.getEntities())) + .scrollId(result.getScrollId()) + .build()); + } + + private List toGenericRelationships(List relatedEntities) { + return relatedEntities.stream() + .map( + result -> { + Urn source = UrnUtils.getUrn(result.getSourceUrn()); + Urn dest = UrnUtils.getUrn(result.getDestinationUrn()); + return GenericRelationship.builder() + .relationshipType(result.getRelationshipType()) + .source(GenericRelationship.GenericNode.fromUrn(source)) + .destination(GenericRelationship.GenericNode.fromUrn(dest)) + .build(); + }) + .collect(Collectors.toList()); + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/TimeseriesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/TimeseriesController.java new file mode 100644 index 00000000000000..ab12b683390110 --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/TimeseriesController.java @@ -0,0 +1,115 @@ +package io.datahubproject.openapi.v2.controller; + +import static io.datahubproject.openapi.v2.utils.ControllerUtil.checkAuthorized; + +import com.datahub.authentication.Authentication; +import com.datahub.authentication.AuthenticationContext; +import com.datahub.authorization.AuthorizerChain; +import com.google.common.collect.ImmutableList; +import com.linkedin.metadata.authorization.PoliciesConfig; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; +import com.linkedin.metadata.timeseries.GenericTimeseriesDocument; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.timeseries.TimeseriesScrollResult; +import com.linkedin.metadata.utils.SearchUtil; +import io.datahubproject.openapi.v2.models.GenericScrollResult; +import io.datahubproject.openapi.v2.models.GenericTimeseriesAspect; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.net.URISyntaxException; +import java.util.List; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequiredArgsConstructor +@RequestMapping("/v2/timeseries") +@Slf4j +@Tag( + name = "Generic Timeseries Aspects", + description = "APIs for ingesting and accessing timeseries aspects") +public class TimeseriesController { + + @Autowired private EntityRegistry entityRegistry; + + @Autowired private TimeseriesAspectService timeseriesAspectService; + + @Autowired private AuthorizerChain authorizationChain; + + @Autowired private boolean restApiAuthorizationEnabled; + + @GetMapping(value = "/{entityName}/{aspectName}", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity> getAspects( + @PathVariable("entityName") String entityName, + @PathVariable("aspectName") String aspectName, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "scrollId", required = false) String scrollId, + @RequestParam(value = "startTimeMillis", required = false) Long startTimeMillis, + @RequestParam(value = "endTimeMillis", required = false) Long endTimeMillis, + @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") + Boolean withSystemMetadata) + throws URISyntaxException { + + if (restApiAuthorizationEnabled) { + Authentication authentication = AuthenticationContext.getAuthentication(); + checkAuthorized( + authorizationChain, + authentication.getActor(), + entityRegistry.getEntitySpec(entityName), + ImmutableList.of(PoliciesConfig.GET_ENTITY_PRIVILEGE.getType())); + } + + AspectSpec aspectSpec = entityRegistry.getEntitySpec(entityName).getAspectSpec(aspectName); + if (!aspectSpec.isTimeseries()) { + throw new IllegalArgumentException("Only timeseries aspects are supported."); + } + + List sortCriterion = + List.of( + SearchUtil.sortBy("timestampMillis", SortOrder.DESCENDING), + SearchUtil.sortBy("messageId", SortOrder.DESCENDING)); + + TimeseriesScrollResult result = + timeseriesAspectService.scrollAspects( + entityName, + aspectName, + null, + sortCriterion, + scrollId, + count, + startTimeMillis, + endTimeMillis); + + return ResponseEntity.ok( + GenericScrollResult.builder() + .scrollId(result.getScrollId()) + .results(toGenericTimeseriesAspect(result.getDocuments(), withSystemMetadata)) + .build()); + } + + private static List toGenericTimeseriesAspect( + List docs, boolean withSystemMetadata) { + return docs.stream() + .map( + doc -> + GenericTimeseriesAspect.builder() + .urn(doc.getUrn()) + .messageId(doc.getMessageId()) + .timestampMillis(doc.getTimestampMillis()) + .systemMetadata(withSystemMetadata ? doc.getSystemMetadata() : null) + .event(doc.getEvent()) + .build()) + .collect(Collectors.toList()); + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericEntity.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericEntity.java new file mode 100644 index 00000000000000..f1e965ca05464f --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericEntity.java @@ -0,0 +1,57 @@ +package io.datahubproject.openapi.v2.models; + +import com.datahub.util.RecordUtils; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.util.Pair; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +public class GenericEntity { + private String urn; + private Map aspects; + + public static class GenericEntityBuilder { + + public GenericEntity build( + ObjectMapper objectMapper, Map> aspects) { + Map jsonObjectMap = + aspects.entrySet().stream() + .map( + e -> { + try { + Map valueMap = + Map.of( + "value", + objectMapper.readTree( + RecordUtils.toJsonString(e.getValue().getFirst()) + .getBytes(StandardCharsets.UTF_8))); + + if (e.getValue().getSecond() != null) { + return Map.entry( + e.getKey(), + Map.of( + "systemMetadata", e.getValue().getSecond(), + "value", valueMap.get("value"))); + } else { + return Map.entry(e.getKey(), Map.of("value", valueMap.get("value"))); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return new GenericEntity(urn, jsonObjectMap); + } + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericRelationship.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericRelationship.java new file mode 100644 index 00000000000000..a4fb429c1eb185 --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericRelationship.java @@ -0,0 +1,36 @@ +package io.datahubproject.openapi.v2.models; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.linkedin.common.urn.Urn; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +public class GenericRelationship { + @Nonnull private String relationshipType; + @Nonnull private GenericNode destination; + @Nonnull private GenericNode source; + @Nullable private NodeProperties properties; + + @Data + @Builder + public static class GenericNode { + @Nonnull private String entityType; + @Nonnull private String urn; + + public static GenericNode fromUrn(@Nonnull Urn urn) { + return GenericNode.builder().entityType(urn.getEntityType()).urn(urn.toString()).build(); + } + } + + @Data + @Builder + public static class NodeProperties { + private List source; + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericScrollResult.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericScrollResult.java new file mode 100644 index 00000000000000..2befc83c003634 --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericScrollResult.java @@ -0,0 +1,12 @@ +package io.datahubproject.openapi.v2.models; + +import java.util.List; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class GenericScrollResult { + private String scrollId; + private List results; +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericTimeseriesAspect.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericTimeseriesAspect.java new file mode 100644 index 00000000000000..9d52ed28b20666 --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/GenericTimeseriesAspect.java @@ -0,0 +1,18 @@ +package io.datahubproject.openapi.v2.models; + +import com.fasterxml.jackson.annotation.JsonInclude; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +public class GenericTimeseriesAspect { + private long timestampMillis; + @Nonnull private String urn; + @Nonnull private Object event; + @Nullable private String messageId; + @Nullable private Object systemMetadata; +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/PatchOperation.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/PatchOperation.java new file mode 100644 index 00000000000000..c5323dfe68369d --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/models/PatchOperation.java @@ -0,0 +1,26 @@ +package io.datahubproject.openapi.v2.models; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linkedin.metadata.aspect.patch.PatchOperationType; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PatchOperation { + @Nonnull private String op; + @Nonnull private String path; + @Nullable private JsonNode value; + @Nullable private List arrayMapKey; + + public PatchOperationType getOp() { + return PatchOperationType.valueOf(op.toUpperCase()); + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/utils/ControllerUtil.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/utils/ControllerUtil.java new file mode 100644 index 00000000000000..70d588721d3b3c --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/utils/ControllerUtil.java @@ -0,0 +1,67 @@ +package io.datahubproject.openapi.v2.utils; + +import com.datahub.authentication.Actor; +import com.datahub.authorization.AuthUtil; +import com.datahub.authorization.ConjunctivePrivilegeGroup; +import com.datahub.authorization.DisjunctivePrivilegeGroup; +import com.datahub.plugins.auth.authorization.Authorizer; +import com.google.common.collect.ImmutableList; +import com.linkedin.metadata.models.EntitySpec; +import io.datahubproject.openapi.exception.UnauthorizedException; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class ControllerUtil { + private ControllerUtil() {} + + public static void checkAuthorized( + @Nonnull Authorizer authorizationChain, + @Nonnull Actor actor, + @Nonnull EntitySpec entitySpec, + @Nonnull List privileges) { + checkAuthorized(authorizationChain, actor, entitySpec, null, privileges); + } + + public static void checkAuthorized( + @Nonnull Authorizer authorizationChain, + @Nonnull Actor actor, + @Nonnull Set entitySpecs, + @Nonnull List privileges) { + DisjunctivePrivilegeGroup orGroup = + new DisjunctivePrivilegeGroup(ImmutableList.of(new ConjunctivePrivilegeGroup(privileges))); + List> resourceSpecs = + entitySpecs.stream() + .map( + entitySpec -> + Optional.of(new com.datahub.authorization.EntitySpec(entitySpec.getName(), ""))) + .collect(Collectors.toList()); + if (!AuthUtil.isAuthorizedForResources( + authorizationChain, actor.toUrnStr(), resourceSpecs, orGroup)) { + throw new UnauthorizedException(actor.toUrnStr() + " is unauthorized to get entities."); + } + } + + public static void checkAuthorized( + @Nonnull Authorizer authorizationChain, + @Nonnull Actor actor, + @Nonnull EntitySpec entitySpec, + @Nullable String entityUrn, + @Nonnull List privileges) { + DisjunctivePrivilegeGroup orGroup = + new DisjunctivePrivilegeGroup(ImmutableList.of(new ConjunctivePrivilegeGroup(privileges))); + + List> resourceSpecs = + List.of( + Optional.of( + new com.datahub.authorization.EntitySpec( + entitySpec.getName(), entityUrn != null ? entityUrn : ""))); + if (!AuthUtil.isAuthorizedForResources( + authorizationChain, actor.toUrnStr(), resourceSpecs, orGroup)) { + throw new UnauthorizedException(actor.toUrnStr() + " is unauthorized to get entities."); + } + } +} diff --git a/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java b/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java index 2a12ecf6866bbe..5187cba0b91510 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java +++ b/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java @@ -7,6 +7,7 @@ import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.timeseries.BatchWriteOperationsOptions; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.timeseries.TimeseriesScrollResult; import com.linkedin.timeseries.AggregationSpec; import com.linkedin.timeseries.DeleteAspectValuesResult; import com.linkedin.timeseries.GenericTable; @@ -118,4 +119,18 @@ public void upsertDocument( public List getIndexSizes() { return List.of(); } + + @Nonnull + @Override + public TimeseriesScrollResult scrollAspects( + @Nonnull String entityName, + @Nonnull String aspectName, + @Nullable Filter filter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { + return TimeseriesScrollResult.builder().build(); + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java index c4216962c134cd..2c1596474fb21e 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java @@ -88,7 +88,7 @@ public static List getAdditionalChanges( public static List getAdditionalChanges( @Nonnull MetadataChangeProposal metadataChangeProposal, - @Nonnull EntityService entityService) { + @Nonnull EntityService entityService) { return getAdditionalChanges(metadataChangeProposal, entityService, false); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java index b3e713a906d01d..625353eeb68205 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java @@ -5,6 +5,7 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.search.utils.QueryUtils; import java.net.URISyntaxException; import java.util.ArrayList; @@ -322,4 +323,18 @@ void removeEdgesFromNode( default boolean supportsMultiHop() { return false; } + + @Nonnull + RelatedEntitiesScrollResult scrollRelatedEntities( + @Nullable List sourceTypes, + @Nonnull Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nonnull Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/RelatedEntities.java b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/RelatedEntities.java new file mode 100644 index 00000000000000..0c6f8a0d65d5cf --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/RelatedEntities.java @@ -0,0 +1,31 @@ +package com.linkedin.metadata.graph; + +import com.linkedin.metadata.query.filter.RelationshipDirection; +import javax.annotation.Nonnull; +import lombok.Getter; + +/** Preserves directionality as well as the generic `related` urn concept */ +@Getter +public class RelatedEntities extends RelatedEntity { + /** source Urn * */ + @Nonnull String sourceUrn; + + /** Destination Urn associated with the related entity. */ + @Nonnull String destinationUrn; + + public RelatedEntities( + @Nonnull String relationshipType, + @Nonnull String sourceUrn, + @Nonnull String destinationUrn, + @Nonnull RelationshipDirection relationshipDirection) { + super( + relationshipType, + relationshipDirection == RelationshipDirection.OUTGOING ? destinationUrn : sourceUrn); + this.sourceUrn = sourceUrn; + this.destinationUrn = destinationUrn; + } + + public RelatedEntity asRelatedEntity() { + return new RelatedEntity(relationshipType, urn); + } +} diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/RelatedEntitiesScrollResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/RelatedEntitiesScrollResult.java new file mode 100644 index 00000000000000..b0b5394ca58083 --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/RelatedEntitiesScrollResult.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.graph; + +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@AllArgsConstructor +@Data +@Builder +public class RelatedEntitiesScrollResult { + int numResults; + int pageSize; + String scrollId; + List entities; +} diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/GenericTimeseriesDocument.java b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/GenericTimeseriesDocument.java new file mode 100644 index 00000000000000..1442f099c47032 --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/GenericTimeseriesDocument.java @@ -0,0 +1,26 @@ +package com.linkedin.metadata.timeseries; + +import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class GenericTimeseriesDocument { + @Nonnull private String urn; + private long timestampMillis; + + @JsonProperty("@timestamp") + private long timestamp; + + @Nonnull private Object event; + @Nullable private String messageId; + @Nullable private Object systemMetadata; + @Nullable private String eventGranularity; + private boolean isExploded; + @Nullable private String runId; + @Nullable private String partition; + @Nullable private Object partitionSpec; +} diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java index 54480bb700398a..529e8e00ecf570 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java @@ -201,4 +201,15 @@ void upsertDocument( @Nonnull final JsonNode document); List getIndexSizes(); + + @Nonnull + TimeseriesScrollResult scrollAspects( + @Nonnull final String entityName, + @Nonnull final String aspectName, + @Nullable Filter filter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesScrollResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesScrollResult.java new file mode 100644 index 00000000000000..200db2dfde8eb3 --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesScrollResult.java @@ -0,0 +1,18 @@ +package com.linkedin.metadata.timeseries; + +import com.linkedin.metadata.aspect.EnvelopedAspect; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@AllArgsConstructor +@Data +@Builder +public class TimeseriesScrollResult { + int numResults; + int pageSize; + String scrollId; + List events; + List documents; +} diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/AuditStampUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/AuditStampUtils.java index 5f3975b066fde6..6ba311cf166d4e 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/AuditStampUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/AuditStampUtils.java @@ -3,8 +3,11 @@ import static com.linkedin.metadata.Constants.SYSTEM_ACTOR; import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; +import java.net.URISyntaxException; import java.time.Clock; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -16,4 +19,11 @@ public static AuditStamp createDefaultAuditStamp() { .setActor(UrnUtils.getUrn(SYSTEM_ACTOR)) .setTime(Clock.systemUTC().millis()); } + + public static AuditStamp createAuditStamp(@Nonnull String actorUrn) throws URISyntaxException { + AuditStamp auditStamp = new AuditStamp(); + auditStamp.setActor(Urn.createFromString(actorUrn)); + auditStamp.setTime(Clock.systemUTC().millis()); + return auditStamp; + } } diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/SearchUtil.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/SearchUtil.java index eb58bc509838d5..9df708c6e9fdcd 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/SearchUtil.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/SearchUtil.java @@ -7,14 +7,19 @@ import com.linkedin.metadata.query.filter.Criterion; import com.linkedin.metadata.query.filter.CriterionArray; import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; import com.linkedin.metadata.search.FilterValue; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -142,4 +147,25 @@ public static BoolQueryBuilder filterSoftDeletedByDefault( } return filterQuery; } + + public static SortCriterion sortBy(@Nonnull String field, @Nullable SortOrder direction) { + SortCriterion sortCriterion = new SortCriterion(); + sortCriterion.setField(field); + sortCriterion.setOrder( + com.linkedin.metadata.query.filter.SortOrder.valueOf( + Optional.ofNullable(direction).orElse(SortOrder.ASCENDING).toString())); + return sortCriterion; + } + + public static Filter andFilter(Criterion... criteria) { + Filter filter = new Filter(); + filter.setOr(andCriterion(Arrays.stream(criteria))); + return filter; + } + + public static ConjunctiveCriterionArray andCriterion(Stream criteria) { + return new ConjunctiveCriterionArray( + new ConjunctiveCriterion() + .setAnd(new CriterionArray(criteria.collect(Collectors.toList())))); + } }