diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java index 6f1a31dd9a6..83321345bc9 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java @@ -62,6 +62,7 @@ List> sumByLogTsAndIp(@Param(value = "ip") String ip, List> sumGroupByIp(@Param(value = "groupId") String groupId, @Param(value = "streamId") String streamId, + @Param(value = "ip") String ip, @Param(value = "auditId") String auditId, @Param(value = "startDate") String startDate, @Param(value = "endData") String endData); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 6615e3c2403..51381ac373d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -44,14 +44,6 @@ import org.apache.inlong.manager.pojo.audit.AuditSourceRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceResponse; import org.apache.inlong.manager.pojo.audit.AuditVO; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Field; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Sum; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo.QueryBool; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo.SortValue; -import org.apache.inlong.manager.pojo.node.es.ElasticsearchRequest; import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.pojo.user.UserRoleCode; import org.apache.inlong.manager.service.audit.AuditRunnable; @@ -61,11 +53,8 @@ import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig; import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonObject; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.jdbc.SQL; @@ -369,9 +358,8 @@ public List listAll(AuditRequest request) throws Exception { // Support min agg at now DateTime endDate = SECOND_DATE_FORMATTER.parseDateTime(request.getEndDate()); String endDateStr = endDate.plusDays(1).toString(SECOND_DATE_FORMATTER); - List> sumList = auditEntityMapper.sumGroupByIp( - request.getInlongGroupId(), request.getInlongStreamId(), auditId, request.getStartDate(), - endDateStr); + List> sumList = auditEntityMapper.sumGroupByIp(request.getInlongGroupId(), + request.getInlongStreamId(), request.getIp(), auditId, request.getStartDate(), endDateStr); List auditSet = sumList.stream().map(s -> { AuditInfo vo = new AuditInfo(); vo.setInlongGroupId((String) s.get("inlongGroupId")); @@ -433,64 +421,6 @@ private List getAuditIds(String groupId, String streamId, String sourceN return new ArrayList<>(auditSet); } - /** - * Convert to elasticsearch search request json - * - * @param groupId The groupId of inlong - * @param streamId The streamId of inlong - * @return The search request of elasticsearch json - */ - public static JsonObject toAuditSearchRequestJson(String groupId, String streamId) { - Map groupIdMap = Maps.newHashMap(); - groupIdMap.put(INLONG_GROUP_ID, new ElasticsearchQueryInfo.TermValue(groupId, DEFAULT_BOOST)); - ElasticsearchQueryInfo.QueryTerm groupIdTerm = ElasticsearchQueryInfo.QueryTerm.builder().term(groupIdMap) - .build(); - Map streamIdMap = Maps.newHashMap(); - streamIdMap.put(INLONG_STREAM_ID, new ElasticsearchQueryInfo.TermValue(streamId, DEFAULT_BOOST)); - ElasticsearchQueryInfo.QueryTerm streamIdTerm = ElasticsearchQueryInfo.QueryTerm.builder().term(streamIdMap) - .build(); - QueryBool boolInfo = QueryBool.builder() - .must(Lists.newArrayList(groupIdTerm, streamIdTerm)) - .boost(DEFAULT_BOOST) - .adjustPureNegative(ADJUST_PURE_NEGATIVE) - .build(); - ElasticsearchQueryInfo queryInfo = ElasticsearchQueryInfo.builder().bool(boolInfo).build(); - - Map termValueInfoMap = Maps.newHashMap(); - termValueInfoMap.put(TERM_FILED, new SortValue(SORT_ORDER)); - List> list = Lists.newArrayList(termValueInfoMap); - ElasticsearchQuerySortInfo sortInfo = ElasticsearchQuerySortInfo.builder().sort(list).build(); - - Sum countSum = Sum.builder() - .sum(new Field(COUNT)) - .build(); - Sum delaySum = Sum.builder() - .sum(new Field(DELAY)) - .build(); - Map aggregations = Maps.newHashMap(); - aggregations.put(COUNT, countSum); - aggregations.put(DELAY, delaySum); - ElasticsearchAggregationsTermsInfo termsInfo = ElasticsearchAggregationsTermsInfo.builder() - .field(TERM_FILED) - .size(Integer.MAX_VALUE) - .aggregations(aggregations) - .build(); - Map terms = Maps.newHashMap(); - terms.put(TERMS, termsInfo); - Map> logTs = Maps.newHashMap(); - logTs.put(TERM_FILED, terms); - - ElasticsearchRequest request = ElasticsearchRequest.builder() - .from(QUERY_FROM) - .size(QUERY_SIZE) - .query(queryInfo) - .sort(sortInfo) - .aggregations(logTs) - .build(); - - return GSON.toJsonTree(request).getAsJsonObject(); - } - /** * Get clickhouse Statement *