From b14bd8cb7004ed6e6f70eb672f73d94921d6cb7a Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Thu, 8 Jun 2023 14:21:37 -0700 Subject: [PATCH] address Sudipto and Amit's comments Signed-off-by: Kaituo Li --- .../ad/AnomalyDetectorProfileRunner.java | 6 +-- .../ad/ExecuteADResultResponseRecorder.java | 2 +- .../opensearch/ad/cluster/ADDataMigrator.java | 4 +- .../ad/feature/SearchFeatureDao.java | 2 +- .../opensearch/ad/model/AnomalyDetector.java | 9 ++--- .../handler/ModelValidationActionHandler.java | 6 +-- .../opensearch/ad/task/ADBatchTaskRunner.java | 14 +++---- .../org/opensearch/ad/task/ADTaskManager.java | 16 ++++---- .../AnomalyResultTransportAction.java | 4 +- .../ForwardADTaskTransportAction.java | 4 +- .../opensearch/forecast/model/Forecaster.java | 37 ++++++++----------- .../opensearch/timeseries/model/Config.java | 16 +++++++- .../timeseries/util/ParseUtils.java | 2 +- .../NoPowermockSearchFeatureDaoTests.java | 2 +- .../opensearch/timeseries/TestHelpers.java | 2 +- 15 files changed, 66 insertions(+), 60 deletions(-) diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java index efc2b1e0e..b5e56ba51 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java @@ -162,7 +162,7 @@ private void prepareProfile( AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); long enabledTimeMs = job.getEnabledTime().toEpochMilli(); - boolean isMultiEntityDetector = detector.isHC(); + boolean isMultiEntityDetector = detector.isHighCardinality(); int totalResponsesToWait = 0; if (profilesToCollect.contains(DetectorProfileName.ERROR)) { @@ -285,7 +285,7 @@ private void prepareProfile( private void profileEntityStats(MultiResponsesDelegateActionListener listener, AnomalyDetector detector) { List categoryField = detector.getCategoryFields(); - if (!detector.isHC() || categoryField.size() > ADNumericSetting.maxCategoricalFields()) { + if (!detector.isHighCardinality() || categoryField.size() > ADNumericSetting.maxCategoricalFields()) { listener.onResponse(new DetectorProfile.Builder().build()); } else { if (categoryField.size() == 1) { @@ -433,7 +433,7 @@ private ActionListener onModelResponse( AnomalyDetectorJob job, MultiResponsesDelegateActionListener listener ) { - boolean isMultientityDetector = detector.isHC(); + boolean isMultientityDetector = detector.isHighCardinality(); return ActionListener.wrap(profileResponse -> { DetectorProfile.Builder profile = new DetectorProfile.Builder(); if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)) { diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java index 107349fa1..aad944277 100644 --- a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -295,7 +295,7 @@ public void indexAnomalyResultException( anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); } - if (errorMessage.contains(ADCommonMessages.NO_MODEL_ERR_MSG) && !detector.isHC()) { + if (errorMessage.contains(ADCommonMessages.NO_MODEL_ERR_MSG) && !detector.isHighCardinality()) { // single stream detector raises ResourceNotFoundException containing CommonErrorMessages.NO_CHECKPOINT_ERR_MSG // when there is no checkpoint. // Delay real time cache update by one minute so we will have trained models by then and update the state diff --git a/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java b/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java index 5da862585..f74d237df 100644 --- a/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java +++ b/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java @@ -244,7 +244,9 @@ private void createRealtimeADTask( try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, r.getSourceAsBytesRef())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); AnomalyDetector detector = AnomalyDetector.parse(parser, r.getId()); - ADTaskType taskType = detector.isHC() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY; + ADTaskType taskType = detector.isHighCardinality() + ? ADTaskType.REALTIME_HC_DETECTOR + : ADTaskType.REALTIME_SINGLE_ENTITY; Instant now = Instant.now(); String userName = job.getUser() != null ? job.getUser().getName() : null; ADTask adTask = new ADTask.Builder() diff --git a/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java b/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java index f3ad37a5f..cf9f0912b 100644 --- a/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java @@ -216,7 +216,7 @@ public void getHighestCountEntities( int pageSize, ActionListener> listener ) { - if (!detector.isHC()) { + if (!detector.isHighCardinality()) { listener.onResponse(null); return; } diff --git a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java index d5f126dd3..008f21e4b 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java @@ -39,7 +39,6 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; -import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonValue; @@ -148,6 +147,8 @@ public AnomalyDetector( imputationOption ); + checkAndThrowValidationErrors(ValidationAspect.DETECTOR); + if (detectionInterval == null) { errorMessage = ADCommonMessages.NULL_DETECTION_INTERVAL; issueType = ValidationIssueType.DETECTION_INTERVAL; @@ -162,11 +163,7 @@ public AnomalyDetector( issueType = ValidationIssueType.CATEGORY; } - if (errorMessage != null && issueType != null) { - throw new ValidationException(errorMessage, issueType, ValidationAspect.DETECTOR); - } else if (errorMessage != null || issueType != null) { - throw new TimeSeriesException(CommonMessages.FAIL_TO_VALIDATE); - } + checkAndThrowValidationErrors(ValidationAspect.DETECTOR); this.detectorType = isHC(categoryFields) ? MULTI_ENTITY.name() : SINGLE_ENTITY.name(); } diff --git a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java index e74f2e1cb..5be7e9534 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java @@ -158,7 +158,7 @@ public void checkIfMultiEntityDetector() { listener.onFailure(exception); logger.error("Failed to get top entity for categorical field", exception); }); - if (anomalyDetector.isHC()) { + if (anomalyDetector.isHighCardinality()) { getTopEntity(recommendationListener); } else { recommendationListener.onResponse(Collections.emptyMap()); @@ -298,7 +298,7 @@ private void getBucketAggregates( ) throws IOException { AggregationBuilder aggregation = getBucketAggregation(latestTime, (IntervalTimeConfiguration) anomalyDetector.getInterval()); BoolQueryBuilder query = QueryBuilders.boolQuery().filter(anomalyDetector.getFilterQuery()); - if (anomalyDetector.isHC()) { + if (anomalyDetector.isHighCardinality()) { if (topEntity.isEmpty()) { listener .onFailure( @@ -653,7 +653,7 @@ private void processDataFilterResults(SearchResponse response, long latestTime) // blocks below are executed if data is dense enough with filter query applied. // If HCAD then category fields will be added to bucket aggregation to see if they // are the root cause of the issues and if not the feature queries will be checked for sparsity - } else if (anomalyDetector.isHC()) { + } else if (anomalyDetector.isHighCardinality()) { getTopEntityForCategoryField(latestTime); } else { try { diff --git a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java index 134514c3d..dd1987b2e 100644 --- a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java +++ b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java @@ -219,7 +219,7 @@ public ADBatchTaskRunner( * @param listener action listener */ public void run(ADTask adTask, TransportService transportService, ActionListener listener) { - boolean isHCDetector = adTask.getDetector().isHC(); + boolean isHCDetector = adTask.getDetector().isHighCardinality(); if (isHCDetector && !adTaskCacheManager.topEntityInited(adTask.getId())) { // Initialize top entities for HC detector threadPool.executor(AD_BATCH_TASK_THREAD_POOL_NAME).execute(() -> { @@ -513,7 +513,7 @@ public void forwardOrExecuteADTask( checkIfADTaskCancelledAndCleanupCache(adTask); String detectorId = adTask.getId(); AnomalyDetector detector = adTask.getDetector(); - boolean isHCDetector = detector.isHC(); + boolean isHCDetector = detector.isHighCardinality(); if (isHCDetector) { String entityString = adTaskCacheManager.pollEntity(detectorId); logger.debug("Start to run entity: {} of detector {}", entityString, detectorId); @@ -646,7 +646,7 @@ private ActionListener workerNodeResponseListener( listener.onFailure(e); handleException(adTask, e); - if (adTask.getDetector().isHC()) { + if (adTask.getDetector().isHighCardinality()) { // Entity task done on worker node. Send entity task done message to coordinating node to poll next entity. adTaskManager.entityTaskDone(adTask, e, transportService); if (adTaskCacheManager.getAvailableNewEntityTaskLanes(adTask.getId()) > 0) { @@ -698,7 +698,7 @@ private void forwardOrExecuteEntityTask( // start new entity task lane private synchronized void startNewEntityTaskLane(ADTask adTask, TransportService transportService) { - if (adTask.getDetector().isHC() && adTaskCacheManager.getAndDecreaseEntityTaskLanes(adTask.getId()) > 0) { + if (adTask.getDetector().isHighCardinality() && adTaskCacheManager.getAndDecreaseEntityTaskLanes(adTask.getId()) > 0) { logger.debug("start new task lane for detector {}", adTask.getId()); forwardOrExecuteADTask(adTask, transportService, getInternalHCDelegatedListener(adTask)); } @@ -803,7 +803,7 @@ private ActionListener internalBatchTaskListener(ADTask adTask, Transpor // If batch task finished normally, remove task from cache and decrease executing task count by 1. adTaskCacheManager.remove(taskId, detectorId, detectorTaskId); adStats.getStat(AD_EXECUTING_BATCH_TASK_COUNT.getName()).decrement(); - if (!adTask.getDetector().isHC()) { + if (!adTask.getDetector().isHighCardinality()) { // Set single-entity detector task as FINISHED here adTaskManager .cleanDetectorCache( @@ -820,7 +820,7 @@ private ActionListener internalBatchTaskListener(ADTask adTask, Transpor // If batch task failed, remove task from cache and decrease executing task count by 1. adTaskCacheManager.remove(taskId, detectorId, detectorTaskId); adStats.getStat(AD_EXECUTING_BATCH_TASK_COUNT.getName()).decrement(); - if (!adTask.getDetector().isHC()) { + if (!adTask.getDetector().isHighCardinality()) { adTaskManager.cleanDetectorCache(adTask, transportService, () -> handleException(adTask, e)); } else { adTaskManager.entityTaskDone(adTask, e, transportService); @@ -1362,7 +1362,7 @@ private void checkIfADTaskCancelledAndCleanupCache(ADTask adTask) { String detectorTaskId = adTask.getDetectorLevelTaskId(); // refresh latest HC task run time adTaskCacheManager.refreshLatestHCTaskRunTime(detectorId); - if (adTask.getDetector().isHC() + if (adTask.getDetector().isHighCardinality() && adTaskCacheManager.isHCTaskCoordinatingNode(detectorId) && adTaskCacheManager.isHistoricalAnalysisCancelledForHC(detectorId, detectorTaskId)) { // clean up pending and running entity on coordinating node diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index f7bc2c042..5d6fd86b1 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -631,7 +631,7 @@ public void checkTaskSlots( // then we will assign 4 tasks slots to this HC detector (4 is less than 8). The data index // only has 2 entities. So we assign 2 more task slots than actual need. But it's ok as we // will auto tune task slot when historical analysis task starts. - int approvedTaskSlots = detector.isHC() ? Math.min(maxRunningEntitiesPerDetector, availableAdTaskSlots) : 1; + int approvedTaskSlots = detector.isHighCardinality() ? Math.min(maxRunningEntitiesPerDetector, availableAdTaskSlots) : 1; forwardToCoordinatingNode( adTask, detector, @@ -774,9 +774,9 @@ public void startDetector( private ADTaskType getADTaskType(AnomalyDetector detector, DateRange detectionDateRange) { if (detectionDateRange == null) { - return detector.isHC() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY; + return detector.isHighCardinality() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY; } else { - return detector.isHC() ? ADTaskType.HISTORICAL_HC_DETECTOR : ADTaskType.HISTORICAL_SINGLE_ENTITY; + return detector.isHighCardinality() ? ADTaskType.HISTORICAL_HC_DETECTOR : ADTaskType.HISTORICAL_SINGLE_ENTITY; } } @@ -1139,7 +1139,7 @@ private void resetHistoricalDetectorTaskState( if (taskStopped) { logger.debug("Reset task state as stopped, task id: {}", adTask.getTaskId()); if (taskProfile.getTaskId() == null // This means coordinating node doesn't have HC detector cache - && detector.isHC() + && detector.isHighCardinality() && !isNullOrEmpty(taskProfile.getEntityTaskProfiles())) { // If coordinating node restarted, HC detector cache on it will be gone. But worker node still // runs entity tasks, we'd better stop these entity tasks to clean up resource earlier. @@ -1198,11 +1198,11 @@ private boolean isTaskStopped(String taskId, AnomalyDetector detector, ADTaskPro // If no node is running this task, reset it as STOPPED. return true; } - if (!detector.isHC() && taskProfile.getNodeId() == null) { + if (!detector.isHighCardinality() && taskProfile.getNodeId() == null) { logger.debug("AD task not running for single entity detector {}, task {}", detectorId, taskId); return true; } - if (detector.isHC() + if (detector.isHighCardinality() && taskProfile.getTotalEntitiesInited() && isNullOrEmpty(taskProfile.getRunningEntities()) && isNullOrEmpty(taskProfile.getEntityTaskProfiles()) @@ -2844,7 +2844,7 @@ public String convertEntityToString(Entity entity, AnomalyDetector detector) { throw new TimeSeriesException(error); } } - if (detector.isHC()) { + if (detector.isHighCardinality()) { String categoryField = detector.getCategoryFields().get(0); return entity.getAttributes().get(categoryField); } @@ -2871,7 +2871,7 @@ public Entity parseEntityFromString(String entityValue, ADTask adTask) { logger.debug(error, e); throw new TimeSeriesException(error); } - } else if (detector.isHC()) { + } else if (detector.isHighCardinality()) { return Entity.createSingleAttributeEntity(detector.getCategoryFields().get(0), entityValue); } throw new IllegalArgumentException("Fail to parse to Entity for single flow detector"); diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java index 1f9e1a01f..84a80b912 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java @@ -402,7 +402,7 @@ private ActionListener> onGetDetector( } AnomalyDetector anomalyDetector = detectorOptional.get(); - if (anomalyDetector.isHC()) { + if (anomalyDetector.isHighCardinality()) { hcDetectors.add(adID); adStats.getStat(StatNames.AD_HC_EXECUTE_REQUEST_COUNT.getName()).increment(); } @@ -445,7 +445,7 @@ private void executeAnomalyDetection( long dataEndTime ) { // HC logic starts here - if (anomalyDetector.isHC()) { + if (anomalyDetector.isHighCardinality()) { Optional previousException = stateManager.fetchExceptionAndClear(adID); if (previousException.isPresent()) { Exception exception = previousException.get(); diff --git a/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java b/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java index 31eb50afc..be3b45c99 100644 --- a/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java @@ -115,7 +115,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener case NEXT_ENTITY: logger.debug("Received NEXT_ENTITY action for detector {}, task {}", detectorId, adTask.getTaskId()); // Run next entity for HC detector historical analysis. - if (detector.isHC()) { // AD task could be HC detector level task or entity task + if (detector.isHighCardinality()) { // AD task could be HC detector level task or entity task adTaskCacheManager.removeRunningEntity(detectorId, entityValue); if (!adTaskCacheManager.hasEntity(detectorId)) { adTaskCacheManager.setDetectorTaskSlots(detectorId, 0); @@ -200,7 +200,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener // Cancel HC detector's historical analysis. // Don't support single detector for this action as single entity task will be cancelled directly // on worker node. - if (detector.isHC()) { + if (detector.isHighCardinality()) { adTaskCacheManager.clearPendingEntities(detectorId); adTaskCacheManager.removeRunningEntity(detectorId, entityValue); if (!adTaskCacheManager.hasEntity(detectorId) || !adTask.isEntityTask()) { diff --git a/src/main/java/org/opensearch/forecast/model/Forecaster.java b/src/main/java/org/opensearch/forecast/model/Forecaster.java index c23bcc8c8..cd17bf573 100644 --- a/src/main/java/org/opensearch/forecast/model/Forecaster.java +++ b/src/main/java/org/opensearch/forecast/model/Forecaster.java @@ -30,7 +30,6 @@ import org.opensearch.forecast.settings.ForecastNumericSetting; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; -import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; @@ -108,6 +107,9 @@ public Forecaster( forecastInterval, imputationOption ); + + checkAndThrowValidationErrors(ValidationAspect.FORECASTER); + if (forecastInterval == null) { errorMessage = ForecastCommonMessages.NULL_FORECAST_INTERVAL; issueType = ValidationIssueType.FORECAST_INTERVAL; @@ -122,22 +124,16 @@ public Forecaster( issueType = ValidationIssueType.CATEGORY; } - if (errorMessage != null && issueType != null) { - throw new ValidationException(errorMessage, issueType, ValidationAspect.FORECASTER); - } else if (errorMessage != null || issueType != null) { - throw new TimeSeriesException(CommonMessages.FAIL_TO_VALIDATE); - } - if (invalidHorizon(horizon)) { - throw new ValidationException( - "Horizon size must be a positive integer no larger than " - + TimeSeriesSettings.MAX_SHINGLE_SIZE * DEFAULT_HORIZON_SHINGLE_RATIO - + ". Got " - + horizon, - ValidationIssueType.SHINGLE_SIZE_FIELD, - ValidationAspect.FORECASTER - ); + errorMessage = "Horizon size must be a positive integer no larger than " + + TimeSeriesSettings.MAX_SHINGLE_SIZE * DEFAULT_HORIZON_SHINGLE_RATIO + + ". Got " + + horizon; + issueType = ValidationIssueType.SHINGLE_SIZE_FIELD; } + + checkAndThrowValidationErrors(ValidationAspect.FORECASTER); + this.horizon = horizon; } @@ -261,7 +257,7 @@ public static Forecaster parse( throw new ValidationException( "Custom query error in data filter: " + e.getMessage(), ValidationIssueType.FILTER_QUERY, - ValidationAspect.DETECTOR + ValidationAspect.FORECASTER ); } catch (IllegalArgumentException e) { if (!e.getMessage().contains("empty clause")) { @@ -275,9 +271,9 @@ public static Forecaster parse( } catch (Exception e) { if (e instanceof IllegalArgumentException && e.getMessage().contains(CommonMessages.NEGATIVE_TIME_CONFIGURATION)) { throw new ValidationException( - "Detection interval must be a positive integer", + "Forecasting interval must be a positive integer", ValidationIssueType.FORECAST_INTERVAL, - ValidationAspect.DETECTOR + ValidationAspect.FORECASTER ); } throw e; @@ -294,7 +290,7 @@ public static Forecaster parse( throw new ValidationException( "Custom query error: " + e.getMessage(), ValidationIssueType.FEATURE_ATTRIBUTES, - ValidationAspect.DETECTOR + ValidationAspect.FORECASTER ); } throw e; @@ -308,7 +304,7 @@ public static Forecaster parse( throw new ValidationException( "Window delay interval must be a positive integer", ValidationIssueType.WINDOW_DELAY, - ValidationAspect.DETECTOR + ValidationAspect.FORECASTER ); } throw e; @@ -364,7 +360,6 @@ public static Forecaster parse( return forecaster; } - // TODO: test if this method works @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder.startObject(); diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index 4ae600407..ae32e59d3 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -25,6 +25,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.timeseries.annotation.Generated; +import org.opensearch.timeseries.common.exception.TimeSeriesException; +import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.dataprocessor.FixedValueImputer; @@ -186,6 +188,8 @@ protected Config( this.customResultIndex = Strings.trimToNull(resultIndex); this.imputationOption = imputationOption; this.imputer = createImputer(); + this.issueType = null; + this.errorMessage = null; } public Config(StreamInput input) throws IOException { @@ -271,7 +275,7 @@ public void writeTo(StreamOutput output) throws IOException { } /** - * If the given shingle size is null, return default based on the kind of detector; + * If the given shingle size is null, return default; * otherwise, return the given shingle size. * * @param customShingleSize Given shingle size @@ -477,7 +481,7 @@ public String getCustomResultIndex() { return customResultIndex; } - public boolean isHC() { + public boolean isHighCardinality() { return Config.isHC(getCategoryFields()); } @@ -547,4 +551,12 @@ protected Imputer createImputer() { } return imputer; } + + protected void checkAndThrowValidationErrors(ValidationAspect validationAspect) { + if (errorMessage != null && issueType != null) { + throw new ValidationException(errorMessage, issueType, validationAspect); + } else if (errorMessage != null || issueType != null) { + throw new TimeSeriesException(CommonMessages.FAIL_TO_VALIDATE); + } + } } diff --git a/src/main/java/org/opensearch/timeseries/util/ParseUtils.java b/src/main/java/org/opensearch/timeseries/util/ParseUtils.java index c76388a70..b83753040 100644 --- a/src/main/java/org/opensearch/timeseries/util/ParseUtils.java +++ b/src/main/java/org/opensearch/timeseries/util/ParseUtils.java @@ -642,7 +642,7 @@ public static SearchSourceBuilder batchFeatureQuery( BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery()); - if (detector.isHC() && entity != null && entity.getAttributes().size() > 0) { + if (detector.isHighCardinality() && entity != null && entity.getAttributes().size() > 0) { entity .getAttributes() .entrySet() diff --git a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java index f2e6e8bb1..2e628ee1a 100644 --- a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java @@ -141,7 +141,7 @@ public void setUp() throws Exception { hostField = "host"; detector = mock(AnomalyDetector.class); - when(detector.isHC()).thenReturn(true); + when(detector.isHighCardinality()).thenReturn(true); when(detector.getCategoryFields()).thenReturn(Arrays.asList(new String[] { serviceField, hostField })); detectorId = "123"; when(detector.getId()).thenReturn(detectorId); diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 1ddcf64ff..8da392c21 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -1402,7 +1402,7 @@ public static ADTask randomAdTask( Map attrMap = new HashMap<>(); detector.getCategoryFields().stream().forEach(f -> attrMap.put(f, randomAlphaOfLength(5))); entity = Entity.createEntityByReordering(attrMap); - } else if (detector.isHC()) { + } else if (detector.isHighCardinality()) { entity = Entity.createEntityByReordering(ImmutableMap.of(detector.getCategoryFields().get(0), randomAlphaOfLength(5))); } }