Skip to content

Commit

Permalink
datacube: rework query store
Browse files Browse the repository at this point in the history
  • Loading branch information
akphi committed Dec 12, 2024
1 parent 127e82b commit 671a006
Show file tree
Hide file tree
Showing 7 changed files with 726 additions and 440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,6 @@ public Response getQuery(@PathParam("queryId") String queryId)
}
}

@GET
@Path("/stats")
@ApiOperation(value = "Get Query Store Stats")
@Consumes({MediaType.APPLICATION_JSON})
public Response getQueryCount()
{
try
{
return Response.ok(this.queryStoreManager.getQueryStoreStats()).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_STATS_ERROR, null);
}
}

@POST
@ApiOperation(value = "Create a new query")
@Consumes({MediaType.APPLICATION_JSON})
Expand Down Expand Up @@ -251,6 +231,27 @@ public Response getQueryEvents(@QueryParam("queryId") @ApiParam("The query ID th
}
}

@GET
@Path("/stats")
@ApiOperation(value = "Get query store statistics")
@Consumes({MediaType.APPLICATION_JSON})
public Response getQueryStoreStats()
{
try
{
return Response.ok(this.queryStoreManager.getQueryStoreStats()).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_STATS_ERROR, null);
}
}

// --------------------------------------------- DataCube Query ---------------------------------------------

@POST
@Path("dataCube/search")
Expand Down Expand Up @@ -378,4 +379,49 @@ public Response deleteDataCubeQuery(@PathParam("queryId") String queryId, @ApiPa
return ExceptionTool.exceptionManager(e, LoggingEventType.DELETE_QUERY_ERROR, identity.getName());
}
}

@GET
@Path("dataCube/events")
@ApiOperation(value = "Get DataCube query events")
@Consumes({MediaType.APPLICATION_JSON})
public Response getDataCubeQueryEvents(@QueryParam("queryId") @ApiParam("The query ID the event is associated with") String queryId,
@QueryParam("eventType") @ApiParam("The type of event") QueryEvent.QueryEventType eventType,
@QueryParam("since") @ApiParam("Lower limit on the UNIX timestamp for the event creation time") Long since,
@QueryParam("until") @ApiParam("Upper limit on the UNIX timestamp for the event creation time") Long until,
@QueryParam("limit") @ApiParam("Limit the number of events returned") Integer limit,
@ApiParam(hidden = true) @Pac4JProfileManager ProfileManager<CommonProfile> profileManager)
{
try
{
return Response.ok().entity(this.dataCubeQueryStoreManager.getQueryEvents(queryId, eventType, since, until, limit)).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_EVENTS_ERROR, null);
}
}

@GET
@Path("dataCube/stats")
@ApiOperation(value = "Get DataCube query store statistics")
@Consumes({MediaType.APPLICATION_JSON})
public Response getDataCubeQueryStoreStats()
{
try
{
return Response.ok(this.dataCubeQueryStoreManager.getQueryStoreStats()).build();
}
catch (Exception e)
{
if (e instanceof ApplicationQueryException)
{
return ((ApplicationQueryException) e).toResponse();
}
return ExceptionTool.exceptionManager(e, LoggingEventType.GET_QUERY_STATS_ERROR, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.set.sorted.MutableSortedSet;
import org.eclipse.collections.impl.utility.LazyIterate;
import org.finos.legend.engine.application.query.model.DataCubeQuery;
import org.finos.legend.engine.application.query.model.QuerySearchSortBy;
import org.finos.legend.engine.application.query.model.QuerySearchSpecification;
import org.finos.legend.engine.application.query.model.QuerySearchTermSpecification;
import org.finos.legend.engine.application.query.model.*;
import org.finos.legend.engine.protocol.pure.v1.model.context.EngineErrorType;
import org.finos.legend.engine.shared.core.operational.errorManagement.EngineException;
import org.finos.legend.engine.shared.core.vault.Vault;
Expand All @@ -45,11 +42,12 @@
public class DataCubeQueryStoreManager
{
private static final int MAX_NUMBER_OF_QUERIES = 100;
private static final int MAX_NUMBER_OF_EVENTS = 1000;

private final ObjectMapper objectMapper = new ObjectMapper();
private static final Document EMPTY_FILTER = Document.parse("{}");

private static final List<String> LIGHT_QUERY_PROJECTION = Arrays.asList("id", "name", "createdAt", "lastUpdatedAt", "lastOpenAt");
private static final List<String> LIGHT_QUERY_PROJECTION = Arrays.asList("id", "name", "owner", "createdAt", "lastUpdatedAt", "lastOpenAt");
private static final int GET_QUERIES_LIMIT = 50;

private final MongoClient mongoClient;
Expand All @@ -59,24 +57,33 @@ public DataCubeQueryStoreManager(MongoClient mongoClient)
this.mongoClient = mongoClient;
}

private MongoDatabase getDataCubeQueryDatabase()
private MongoDatabase getQueryDatabase()
{
if (Vault.INSTANCE.hasValue("query.mongo.database"))
{
return this.mongoClient.getDatabase(Vault.INSTANCE.getValue("query.mongo.database"));
}
throw new RuntimeException("DataCube Query MongoDB database has not been configured properly");
throw new RuntimeException("DataCube query MongoDB database has not been configured properly");
}

private MongoCollection<Document> getQueryCollection()
{
if (Vault.INSTANCE.hasValue("query.mongo.collection.dataCube"))
{
return this.getDataCubeQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCube"));
return this.getQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCube"));
}
throw new RuntimeException("DataCube Query MongoDB collection has not been configured properly");
}

private MongoCollection<Document> getQueryEventCollection()
{
if (Vault.INSTANCE.hasValue("query.mongo.collection.dataCubeEvent"))
{
return this.getQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCubeEvent"));
}
throw new RuntimeException("Query event MongoDB collection has not been configured properly");
}

private <T> T documentToClass(Document document, Class<T> _class)
{
try
Expand All @@ -99,6 +106,36 @@ private Document queryToDocument(DataCubeQuery query) throws JsonProcessingExcep
return Document.parse(objectMapper.writeValueAsString(query));
}

private static QueryEvent createEvent(String queryId, QueryEvent.QueryEventType eventType)
{
QueryEvent event = new QueryEvent();
event.queryId = queryId;
event.timestamp = Instant.now().toEpochMilli();
event.eventType = eventType;
return event;
}

private static QueryEvent documentToQueryEvent(Document document)
{
QueryEvent event = new QueryEvent();
event.queryId = document.getString("queryId");
try
{
event.timestamp = document.getLong("timestamp");
}
catch (ClassCastException e)
{
event.timestamp = Long.valueOf(document.getInteger("timestamp"));
}
event.eventType = QueryEvent.QueryEventType.valueOf(document.getString("eventType"));
return event;
}

private Document queryEventToDocument(QueryEvent event) throws JsonProcessingException
{
return Document.parse(objectMapper.writeValueAsString((event)));
}

private static void validate(boolean predicate, String message)
{
if (!predicate)
Expand All @@ -112,17 +149,11 @@ private static void validateNonEmptyQueryField(String fieldValue, String message
validate(fieldValue != null && !fieldValue.isEmpty(), message);
}

private static void validateNonNullQueryField(Object fieldValue, String message)
{
validate(fieldValue != null, message);
}

public static void validateQuery(DataCubeQuery query)
{
validateNonEmptyQueryField(query.id, "Query ID is missing or empty");
validateNonEmptyQueryField(query.name, "Query name is missing or empty");
validateNonNullQueryField(query.query, "Query is missing");
validateNonNullQueryField(query.source, "Query source is missing");
validate(query.content != null, "Query content is missing");
}

public List<DataCubeQuery> searchQueries(QuerySearchSpecification searchSpecification, String currentUser)
Expand All @@ -138,20 +169,30 @@ public List<DataCubeQuery> searchQueries(QuerySearchSpecification searchSpecific
if (querySearchTermSpecification.exactMatchName != null && querySearchTermSpecification.exactMatchName)
{
Bson filter = Filters.eq("name", querySearchTermSpecification.searchTerm);
if (querySearchTermSpecification.includeOwner != null && querySearchTermSpecification.includeOwner)
{
filter = Filters.or(filter, Filters.eq("owner", querySearchTermSpecification.searchTerm));
}
filters.add(filter);
}
else
{
Bson idFilter = Filters.eq("id", querySearchTermSpecification.searchTerm);
Bson nameFilter = Filters.regex("name", Pattern.quote(querySearchTermSpecification.searchTerm), "i");
Bson filter = Filters.or(idFilter, nameFilter);
if (querySearchTermSpecification.includeOwner != null && querySearchTermSpecification.includeOwner)
{
filter = Filters.or(idFilter, nameFilter, Filters.regex("owner", Pattern.quote(querySearchTermSpecification.searchTerm), "i"));
}
filters.add(filter);
}
}

List<DataCubeQuery> queries = new ArrayList<>();
List<Bson> aggregateLists = new ArrayList<>();
aggregateLists.add(Aggregates.addFields(new Field<>("isCurrentUser", new Document("$eq", Arrays.asList("$owner", currentUser)))));
aggregateLists.add(Aggregates.match(filters.isEmpty() ? EMPTY_FILTER : Filters.and(filters)));
aggregateLists.add(Aggregates.sort(Sorts.descending("isCurrentUser")));
if (searchSpecification.sortByOption != null)
{
aggregateLists.add(Aggregates.sort(Sorts.descending(getSortByField(searchSpecification.sortByOption))));
Expand Down Expand Up @@ -224,7 +265,7 @@ else if (count == 0)
return matchingQueries;
}

public DataCubeQuery getQuery(String queryId) throws JsonProcessingException
public DataCubeQuery getQuery(String queryId)
{
List<DataCubeQuery> matchingQueries = LazyIterate.collect(this.getQueryCollection().find(Filters.eq("id", queryId)), this::documentToQuery).toList();
if (matchingQueries.size() > 1)
Expand All @@ -248,7 +289,9 @@ else if (matchingQueries.isEmpty())
public DataCubeQuery createQuery(DataCubeQuery query, String currentUser) throws JsonProcessingException
{
validateQuery(query);
// TODO: store ownership information

// Force the current user as owner regardless of user input
query.owner = currentUser;

List<DataCubeQuery> matchingQueries = LazyIterate.collect(this.getQueryCollection().find(Filters.eq("id", query.id)), this::documentToQuery).toList();
if (!matchingQueries.isEmpty())
Expand All @@ -259,6 +302,9 @@ public DataCubeQuery createQuery(DataCubeQuery query, String currentUser) throws
query.lastUpdatedAt = query.createdAt;
query.lastOpenAt = query.createdAt;
this.getQueryCollection().insertOne(queryToDocument(query));
QueryEvent createdEvent = createEvent(query.id, QueryEvent.QueryEventType.CREATED);
createdEvent.timestamp = query.createdAt;
this.getQueryEventCollection().insertOne(queryEventToDocument(createdEvent));
return query;
}

Expand All @@ -281,11 +327,20 @@ else if (matchingQueries.isEmpty())
}
DataCubeQuery currentQuery = matchingQueries.get(0);

// TODO: check ownership
// Make sure only the owner can update the query
// NOTE: if the query is created by an anonymous user previously, set the current user as the owner
if (currentQuery.owner != null && !currentQuery.owner.equals(currentUser))
{
throw new ApplicationQueryException("Only owner can update the query", Response.Status.FORBIDDEN);
}
query.owner = currentUser;
query.createdAt = currentQuery.createdAt;
query.lastUpdatedAt = Instant.now().toEpochMilli();
query.lastOpenAt = Instant.now().toEpochMilli();
this.getQueryCollection().findOneAndReplace(Filters.eq("id", queryId), queryToDocument(query));
QueryEvent updatedEvent = createEvent(query.id, QueryEvent.QueryEventType.UPDATED);
updatedEvent.timestamp = query.lastUpdatedAt;
this.getQueryEventCollection().insertOne(queryEventToDocument(updatedEvent));
return query;
}

Expand All @@ -302,7 +357,45 @@ else if (matchingQueries.isEmpty())
}
DataCubeQuery currentQuery = matchingQueries.get(0);

// TODO: check ownership
// Make sure only the owner can delete the query
if (currentQuery.owner != null && !currentQuery.owner.equals(currentUser))
{
throw new ApplicationQueryException("Only owner can delete the query", Response.Status.FORBIDDEN);
}
this.getQueryCollection().findOneAndDelete(Filters.eq("id", queryId));
this.getQueryEventCollection().insertOne(queryEventToDocument(createEvent(queryId, QueryEvent.QueryEventType.DELETED)));
}

public List<QueryEvent> getQueryEvents(String queryId, QueryEvent.QueryEventType eventType, Long since, Long until, Integer limit)
{
List<Bson> filters = new ArrayList<>();
if (queryId != null)
{
filters.add(Filters.eq("queryId", queryId));
}
if (eventType != null)
{
filters.add(Filters.eq("eventType", eventType.toString()));
}
if (since != null)
{
filters.add(Filters.gte("timestamp", since));
}
if (until != null)
{
filters.add(Filters.lte("timestamp", until));
}
return LazyIterate.collect(this.getQueryEventCollection()
.find(filters.isEmpty() ? EMPTY_FILTER : Filters.and(filters))
.limit(Math.min(MAX_NUMBER_OF_EVENTS, limit == null ? Integer.MAX_VALUE : limit)), DataCubeQueryStoreManager::documentToQueryEvent).toList();
}

public QueryStoreStats getQueryStoreStats()
{
Long count = this.getQueryCollection().countDocuments();
QueryStoreStats storeStats = new QueryStoreStats();
storeStats.setQueryCount(count);
storeStats.setQueryCreatedFromDataSpaceCount(0L);
return storeStats;
}
}
Loading

0 comments on commit 671a006

Please sign in to comment.