Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datacube: query store improvements #3298

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,6 @@ public Response getQuery(@PathParam("queryId") String queryId)
}
}

@GET
@Path("/stats")
@ApiOperation(value = "Get Query Store Stats")
@Consumes({MediaType.APPLICATION_JSON})
public Response getQueryCount()
{
try
{
return Response.ok(this.queryStoreManager.getQueryStoreStats()).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_STATS_ERROR, null);
}
}

@POST
@ApiOperation(value = "Create a new query")
@Consumes({MediaType.APPLICATION_JSON})
Expand Down Expand Up @@ -251,6 +231,27 @@ public Response getQueryEvents(@QueryParam("queryId") @ApiParam("The query ID th
}
}

@GET
@Path("/stats")
@ApiOperation(value = "Get query store statistics")
@Consumes({MediaType.APPLICATION_JSON})
public Response getQueryStoreStats()
{
try
{
return Response.ok(this.queryStoreManager.getQueryStoreStats()).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_STATS_ERROR, null);
}
}

// --------------------------------------------- DataCube Query ---------------------------------------------

@POST
@Path("dataCube/search")
Expand Down Expand Up @@ -378,4 +379,49 @@ public Response deleteDataCubeQuery(@PathParam("queryId") String queryId, @ApiPa
return ExceptionTool.exceptionManager(e, LoggingEventType.DELETE_QUERY_ERROR, identity.getName());
}
}

@GET
@Path("dataCube/events")
@ApiOperation(value = "Get DataCube query events")
@Consumes({MediaType.APPLICATION_JSON})
public Response getDataCubeQueryEvents(@QueryParam("queryId") @ApiParam("The query ID the event is associated with") String queryId,
@QueryParam("eventType") @ApiParam("The type of event") QueryEvent.QueryEventType eventType,
@QueryParam("since") @ApiParam("Lower limit on the UNIX timestamp for the event creation time") Long since,
@QueryParam("until") @ApiParam("Upper limit on the UNIX timestamp for the event creation time") Long until,
@QueryParam("limit") @ApiParam("Limit the number of events returned") Integer limit,
@ApiParam(hidden = true) @Pac4JProfileManager ProfileManager<CommonProfile> profileManager)
{
try
{
return Response.ok().entity(this.dataCubeQueryStoreManager.getQueryEvents(queryId, eventType, since, until, limit)).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_EVENTS_ERROR, null);
}
}

@GET
@Path("dataCube/stats")
@ApiOperation(value = "Get DataCube query store statistics")
@Consumes({MediaType.APPLICATION_JSON})
public Response getDataCubeQueryStoreStats()
{
try
{
return Response.ok(this.dataCubeQueryStoreManager.getQueryStoreStats()).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_STATS_ERROR, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.set.sorted.MutableSortedSet;
import org.eclipse.collections.impl.utility.LazyIterate;
import org.finos.legend.engine.application.query.model.DataCubeQuery;
import org.finos.legend.engine.application.query.model.QuerySearchSortBy;
import org.finos.legend.engine.application.query.model.QuerySearchSpecification;
import org.finos.legend.engine.application.query.model.QuerySearchTermSpecification;
import org.finos.legend.engine.application.query.model.*;
import org.finos.legend.engine.protocol.pure.v1.model.context.EngineErrorType;
import org.finos.legend.engine.shared.core.operational.errorManagement.EngineException;
import org.finos.legend.engine.shared.core.vault.Vault;
Expand All @@ -45,11 +42,12 @@
public class DataCubeQueryStoreManager
{
private static final int MAX_NUMBER_OF_QUERIES = 100;
private static final int MAX_NUMBER_OF_EVENTS = 1000;

private final ObjectMapper objectMapper = new ObjectMapper();
private static final Document EMPTY_FILTER = Document.parse("{}");

private static final List<String> LIGHT_QUERY_PROJECTION = Arrays.asList("id", "name", "createdAt", "lastUpdatedAt", "lastOpenAt");
private static final List<String> LIGHT_QUERY_PROJECTION = Arrays.asList("id", "name", "owner", "createdAt", "lastUpdatedAt", "lastOpenAt");
private static final int GET_QUERIES_LIMIT = 50;

private final MongoClient mongoClient;
Expand All @@ -59,24 +57,33 @@ public DataCubeQueryStoreManager(MongoClient mongoClient)
this.mongoClient = mongoClient;
}

private MongoDatabase getDataCubeQueryDatabase()
private MongoDatabase getQueryDatabase()
{
if (Vault.INSTANCE.hasValue("query.mongo.database"))
{
return this.mongoClient.getDatabase(Vault.INSTANCE.getValue("query.mongo.database"));
}
throw new RuntimeException("DataCube Query MongoDB database has not been configured properly");
throw new RuntimeException("DataCube query MongoDB database has not been configured properly");
}

private MongoCollection<Document> getQueryCollection()
{
if (Vault.INSTANCE.hasValue("query.mongo.collection.dataCube"))
{
return this.getDataCubeQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCube"));
return this.getQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCube"));
}
throw new RuntimeException("DataCube Query MongoDB collection has not been configured properly");
}

private MongoCollection<Document> getQueryEventCollection()
{
if (Vault.INSTANCE.hasValue("query.mongo.collection.dataCubeEvent"))
{
return this.getQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCubeEvent"));
}
throw new RuntimeException("Query event MongoDB collection has not been configured properly");
}

private <T> T documentToClass(Document document, Class<T> _class)
{
try
Expand All @@ -99,6 +106,36 @@ private Document queryToDocument(DataCubeQuery query) throws JsonProcessingExcep
return Document.parse(objectMapper.writeValueAsString(query));
}

private static QueryEvent createEvent(String queryId, QueryEvent.QueryEventType eventType)
{
QueryEvent event = new QueryEvent();
event.queryId = queryId;
event.timestamp = Instant.now().toEpochMilli();
event.eventType = eventType;
return event;
}

private static QueryEvent documentToQueryEvent(Document document)
{
QueryEvent event = new QueryEvent();
event.queryId = document.getString("queryId");
try
{
event.timestamp = document.getLong("timestamp");
}
catch (ClassCastException e)
{
event.timestamp = Long.valueOf(document.getInteger("timestamp"));
}
event.eventType = QueryEvent.QueryEventType.valueOf(document.getString("eventType"));
return event;
}

private Document queryEventToDocument(QueryEvent event) throws JsonProcessingException
{
return Document.parse(objectMapper.writeValueAsString((event)));
}

private static void validate(boolean predicate, String message)
{
if (!predicate)
Expand All @@ -112,17 +149,11 @@ private static void validateNonEmptyQueryField(String fieldValue, String message
validate(fieldValue != null && !fieldValue.isEmpty(), message);
}

private static void validateNonNullQueryField(Object fieldValue, String message)
{
validate(fieldValue != null, message);
}

public static void validateQuery(DataCubeQuery query)
{
validateNonEmptyQueryField(query.id, "Query ID is missing or empty");
validateNonEmptyQueryField(query.name, "Query name is missing or empty");
validateNonNullQueryField(query.query, "Query is missing");
validateNonNullQueryField(query.source, "Query source is missing");
validate(query.content != null, "Query content is missing");
}

public List<DataCubeQuery> searchQueries(QuerySearchSpecification searchSpecification, String currentUser)
Expand All @@ -138,20 +169,30 @@ public List<DataCubeQuery> searchQueries(QuerySearchSpecification searchSpecific
if (querySearchTermSpecification.exactMatchName != null && querySearchTermSpecification.exactMatchName)
{
Bson filter = Filters.eq("name", querySearchTermSpecification.searchTerm);
if (querySearchTermSpecification.includeOwner != null && querySearchTermSpecification.includeOwner)
{
filter = Filters.or(filter, Filters.eq("owner", querySearchTermSpecification.searchTerm));
}
filters.add(filter);
}
else
{
Bson idFilter = Filters.eq("id", querySearchTermSpecification.searchTerm);
Bson nameFilter = Filters.regex("name", Pattern.quote(querySearchTermSpecification.searchTerm), "i");
Bson filter = Filters.or(idFilter, nameFilter);
if (querySearchTermSpecification.includeOwner != null && querySearchTermSpecification.includeOwner)
{
filter = Filters.or(idFilter, nameFilter, Filters.regex("owner", Pattern.quote(querySearchTermSpecification.searchTerm), "i"));
}
filters.add(filter);
}
}

List<DataCubeQuery> queries = new ArrayList<>();
List<Bson> aggregateLists = new ArrayList<>();
aggregateLists.add(Aggregates.addFields(new Field<>("isCurrentUser", new Document("$eq", Arrays.asList("$owner", currentUser)))));
aggregateLists.add(Aggregates.match(filters.isEmpty() ? EMPTY_FILTER : Filters.and(filters)));
aggregateLists.add(Aggregates.sort(Sorts.descending("isCurrentUser")));
if (searchSpecification.sortByOption != null)
{
aggregateLists.add(Aggregates.sort(Sorts.descending(getSortByField(searchSpecification.sortByOption))));
Expand Down Expand Up @@ -224,7 +265,7 @@ else if (count == 0)
return matchingQueries;
}

public DataCubeQuery getQuery(String queryId) throws JsonProcessingException
public DataCubeQuery getQuery(String queryId)
{
List<DataCubeQuery> matchingQueries = LazyIterate.collect(this.getQueryCollection().find(Filters.eq("id", queryId)), this::documentToQuery).toList();
if (matchingQueries.size() > 1)
Expand All @@ -248,7 +289,9 @@ else if (matchingQueries.isEmpty())
public DataCubeQuery createQuery(DataCubeQuery query, String currentUser) throws JsonProcessingException
{
validateQuery(query);
// TODO: store ownership information

// Force the current user as owner regardless of user input
query.owner = currentUser;

List<DataCubeQuery> matchingQueries = LazyIterate.collect(this.getQueryCollection().find(Filters.eq("id", query.id)), this::documentToQuery).toList();
if (!matchingQueries.isEmpty())
Expand All @@ -259,6 +302,9 @@ public DataCubeQuery createQuery(DataCubeQuery query, String currentUser) throws
query.lastUpdatedAt = query.createdAt;
query.lastOpenAt = query.createdAt;
this.getQueryCollection().insertOne(queryToDocument(query));
QueryEvent createdEvent = createEvent(query.id, QueryEvent.QueryEventType.CREATED);
createdEvent.timestamp = query.createdAt;
this.getQueryEventCollection().insertOne(queryEventToDocument(createdEvent));
return query;
}

Expand All @@ -281,11 +327,20 @@ else if (matchingQueries.isEmpty())
}
DataCubeQuery currentQuery = matchingQueries.get(0);

// TODO: check ownership
// Make sure only the owner can update the query
// NOTE: if the query is created by an anonymous user previously, set the current user as the owner
if (currentQuery.owner != null && !currentQuery.owner.equals(currentUser))
{
throw new ApplicationQueryException("Only owner can update the query", Response.Status.FORBIDDEN);
}
query.owner = currentUser;
query.createdAt = currentQuery.createdAt;
query.lastUpdatedAt = Instant.now().toEpochMilli();
query.lastOpenAt = Instant.now().toEpochMilli();
this.getQueryCollection().findOneAndReplace(Filters.eq("id", queryId), queryToDocument(query));
QueryEvent updatedEvent = createEvent(query.id, QueryEvent.QueryEventType.UPDATED);
updatedEvent.timestamp = query.lastUpdatedAt;
this.getQueryEventCollection().insertOne(queryEventToDocument(updatedEvent));
return query;
}

Expand All @@ -302,7 +357,45 @@ else if (matchingQueries.isEmpty())
}
DataCubeQuery currentQuery = matchingQueries.get(0);

// TODO: check ownership
// Make sure only the owner can delete the query
if (currentQuery.owner != null && !currentQuery.owner.equals(currentUser))
{
throw new ApplicationQueryException("Only owner can delete the query", Response.Status.FORBIDDEN);
}
this.getQueryCollection().findOneAndDelete(Filters.eq("id", queryId));
this.getQueryEventCollection().insertOne(queryEventToDocument(createEvent(queryId, QueryEvent.QueryEventType.DELETED)));
}

public List<QueryEvent> getQueryEvents(String queryId, QueryEvent.QueryEventType eventType, Long since, Long until, Integer limit)
{
List<Bson> filters = new ArrayList<>();
if (queryId != null)
{
filters.add(Filters.eq("queryId", queryId));
}
if (eventType != null)
{
filters.add(Filters.eq("eventType", eventType.toString()));
}
if (since != null)
{
filters.add(Filters.gte("timestamp", since));
}
if (until != null)
{
filters.add(Filters.lte("timestamp", until));
}
return LazyIterate.collect(this.getQueryEventCollection()
.find(filters.isEmpty() ? EMPTY_FILTER : Filters.and(filters))
.limit(Math.min(MAX_NUMBER_OF_EVENTS, limit == null ? Integer.MAX_VALUE : limit)), DataCubeQueryStoreManager::documentToQueryEvent).toList();
}

public QueryStoreStats getQueryStoreStats()
{
Long count = this.getQueryCollection().countDocuments();
QueryStoreStats storeStats = new QueryStoreStats();
storeStats.setQueryCount(count);
storeStats.setQueryCreatedFromDataSpaceCount(0L);
return storeStats;
}
}
Loading
Loading