Skip to content

Commit

Permalink
address Sudipto and Amit's comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jun 8, 2023
1 parent 580e542 commit b14bd8c
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void prepareProfile(
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isHC();
boolean isMultiEntityDetector = detector.isHighCardinality();

int totalResponsesToWait = 0;
if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
Expand Down Expand Up @@ -285,7 +285,7 @@ private void prepareProfile(

private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorProfile> listener, AnomalyDetector detector) {
List<String> categoryField = detector.getCategoryFields();
if (!detector.isHC() || categoryField.size() > ADNumericSetting.maxCategoricalFields()) {
if (!detector.isHighCardinality() || categoryField.size() > ADNumericSetting.maxCategoricalFields()) {
listener.onResponse(new DetectorProfile.Builder().build());
} else {
if (categoryField.size() == 1) {
Expand Down Expand Up @@ -433,7 +433,7 @@ private ActionListener<ProfileResponse> onModelResponse(
AnomalyDetectorJob job,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
boolean isMultientityDetector = detector.isHC();
boolean isMultientityDetector = detector.isHighCardinality();
return ActionListener.wrap(profileResponse -> {
DetectorProfile.Builder profile = new DetectorProfile.Builder();
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void indexAnomalyResultException(
anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
}

if (errorMessage.contains(ADCommonMessages.NO_MODEL_ERR_MSG) && !detector.isHC()) {
if (errorMessage.contains(ADCommonMessages.NO_MODEL_ERR_MSG) && !detector.isHighCardinality()) {
// single stream detector raises ResourceNotFoundException containing CommonErrorMessages.NO_CHECKPOINT_ERR_MSG
// when there is no checkpoint.
// Delay real time cache update by one minute so we will have trained models by then and update the state
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ private void createRealtimeADTask(
try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, r.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetector detector = AnomalyDetector.parse(parser, r.getId());
ADTaskType taskType = detector.isHC() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY;
ADTaskType taskType = detector.isHighCardinality()
? ADTaskType.REALTIME_HC_DETECTOR
: ADTaskType.REALTIME_SINGLE_ENTITY;
Instant now = Instant.now();
String userName = job.getUser() != null ? job.getUser().getName() : null;
ADTask adTask = new ADTask.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void getHighestCountEntities(
int pageSize,
ActionListener<List<Entity>> listener
) {
if (!detector.isHC()) {
if (!detector.isHighCardinality()) {
listener.onResponse(null);
return;
}
Expand Down
9 changes: 3 additions & 6 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.common.exception.ValidationException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonValue;
Expand Down Expand Up @@ -148,6 +147,8 @@ public AnomalyDetector(
imputationOption
);

checkAndThrowValidationErrors(ValidationAspect.DETECTOR);

if (detectionInterval == null) {
errorMessage = ADCommonMessages.NULL_DETECTION_INTERVAL;
issueType = ValidationIssueType.DETECTION_INTERVAL;
Expand All @@ -162,11 +163,7 @@ public AnomalyDetector(
issueType = ValidationIssueType.CATEGORY;
}

if (errorMessage != null && issueType != null) {
throw new ValidationException(errorMessage, issueType, ValidationAspect.DETECTOR);
} else if (errorMessage != null || issueType != null) {
throw new TimeSeriesException(CommonMessages.FAIL_TO_VALIDATE);
}
checkAndThrowValidationErrors(ValidationAspect.DETECTOR);

this.detectorType = isHC(categoryFields) ? MULTI_ENTITY.name() : SINGLE_ENTITY.name();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void checkIfMultiEntityDetector() {
listener.onFailure(exception);
logger.error("Failed to get top entity for categorical field", exception);
});
if (anomalyDetector.isHC()) {
if (anomalyDetector.isHighCardinality()) {
getTopEntity(recommendationListener);
} else {
recommendationListener.onResponse(Collections.emptyMap());
Expand Down Expand Up @@ -298,7 +298,7 @@ private void getBucketAggregates(
) throws IOException {
AggregationBuilder aggregation = getBucketAggregation(latestTime, (IntervalTimeConfiguration) anomalyDetector.getInterval());
BoolQueryBuilder query = QueryBuilders.boolQuery().filter(anomalyDetector.getFilterQuery());
if (anomalyDetector.isHC()) {
if (anomalyDetector.isHighCardinality()) {
if (topEntity.isEmpty()) {
listener
.onFailure(
Expand Down Expand Up @@ -653,7 +653,7 @@ private void processDataFilterResults(SearchResponse response, long latestTime)
// blocks below are executed if data is dense enough with filter query applied.
// If HCAD then category fields will be added to bucket aggregation to see if they
// are the root cause of the issues and if not the feature queries will be checked for sparsity
} else if (anomalyDetector.isHC()) {
} else if (anomalyDetector.isHighCardinality()) {
getTopEntityForCategoryField(latestTime);
} else {
try {
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public ADBatchTaskRunner(
* @param listener action listener
*/
public void run(ADTask adTask, TransportService transportService, ActionListener<ADBatchAnomalyResultResponse> listener) {
boolean isHCDetector = adTask.getDetector().isHC();
boolean isHCDetector = adTask.getDetector().isHighCardinality();
if (isHCDetector && !adTaskCacheManager.topEntityInited(adTask.getId())) {
// Initialize top entities for HC detector
threadPool.executor(AD_BATCH_TASK_THREAD_POOL_NAME).execute(() -> {
Expand Down Expand Up @@ -513,7 +513,7 @@ public void forwardOrExecuteADTask(
checkIfADTaskCancelledAndCleanupCache(adTask);
String detectorId = adTask.getId();
AnomalyDetector detector = adTask.getDetector();
boolean isHCDetector = detector.isHC();
boolean isHCDetector = detector.isHighCardinality();
if (isHCDetector) {
String entityString = adTaskCacheManager.pollEntity(detectorId);
logger.debug("Start to run entity: {} of detector {}", entityString, detectorId);
Expand Down Expand Up @@ -646,7 +646,7 @@ private ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener(
listener.onFailure(e);
handleException(adTask, e);

if (adTask.getDetector().isHC()) {
if (adTask.getDetector().isHighCardinality()) {
// Entity task done on worker node. Send entity task done message to coordinating node to poll next entity.
adTaskManager.entityTaskDone(adTask, e, transportService);
if (adTaskCacheManager.getAvailableNewEntityTaskLanes(adTask.getId()) > 0) {
Expand Down Expand Up @@ -698,7 +698,7 @@ private void forwardOrExecuteEntityTask(

// start new entity task lane
private synchronized void startNewEntityTaskLane(ADTask adTask, TransportService transportService) {
if (adTask.getDetector().isHC() && adTaskCacheManager.getAndDecreaseEntityTaskLanes(adTask.getId()) > 0) {
if (adTask.getDetector().isHighCardinality() && adTaskCacheManager.getAndDecreaseEntityTaskLanes(adTask.getId()) > 0) {
logger.debug("start new task lane for detector {}", adTask.getId());
forwardOrExecuteADTask(adTask, transportService, getInternalHCDelegatedListener(adTask));
}
Expand Down Expand Up @@ -803,7 +803,7 @@ private ActionListener<String> internalBatchTaskListener(ADTask adTask, Transpor
// If batch task finished normally, remove task from cache and decrease executing task count by 1.
adTaskCacheManager.remove(taskId, detectorId, detectorTaskId);
adStats.getStat(AD_EXECUTING_BATCH_TASK_COUNT.getName()).decrement();
if (!adTask.getDetector().isHC()) {
if (!adTask.getDetector().isHighCardinality()) {
// Set single-entity detector task as FINISHED here
adTaskManager
.cleanDetectorCache(
Expand All @@ -820,7 +820,7 @@ private ActionListener<String> internalBatchTaskListener(ADTask adTask, Transpor
// If batch task failed, remove task from cache and decrease executing task count by 1.
adTaskCacheManager.remove(taskId, detectorId, detectorTaskId);
adStats.getStat(AD_EXECUTING_BATCH_TASK_COUNT.getName()).decrement();
if (!adTask.getDetector().isHC()) {
if (!adTask.getDetector().isHighCardinality()) {
adTaskManager.cleanDetectorCache(adTask, transportService, () -> handleException(adTask, e));
} else {
adTaskManager.entityTaskDone(adTask, e, transportService);
Expand Down Expand Up @@ -1362,7 +1362,7 @@ private void checkIfADTaskCancelledAndCleanupCache(ADTask adTask) {
String detectorTaskId = adTask.getDetectorLevelTaskId();
// refresh latest HC task run time
adTaskCacheManager.refreshLatestHCTaskRunTime(detectorId);
if (adTask.getDetector().isHC()
if (adTask.getDetector().isHighCardinality()
&& adTaskCacheManager.isHCTaskCoordinatingNode(detectorId)
&& adTaskCacheManager.isHistoricalAnalysisCancelledForHC(detectorId, detectorTaskId)) {
// clean up pending and running entity on coordinating node
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ public void checkTaskSlots(
// then we will assign 4 tasks slots to this HC detector (4 is less than 8). The data index
// only has 2 entities. So we assign 2 more task slots than actual need. But it's ok as we
// will auto tune task slot when historical analysis task starts.
int approvedTaskSlots = detector.isHC() ? Math.min(maxRunningEntitiesPerDetector, availableAdTaskSlots) : 1;
int approvedTaskSlots = detector.isHighCardinality() ? Math.min(maxRunningEntitiesPerDetector, availableAdTaskSlots) : 1;
forwardToCoordinatingNode(
adTask,
detector,
Expand Down Expand Up @@ -774,9 +774,9 @@ public void startDetector(

private ADTaskType getADTaskType(AnomalyDetector detector, DateRange detectionDateRange) {
if (detectionDateRange == null) {
return detector.isHC() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY;
return detector.isHighCardinality() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY;
} else {
return detector.isHC() ? ADTaskType.HISTORICAL_HC_DETECTOR : ADTaskType.HISTORICAL_SINGLE_ENTITY;
return detector.isHighCardinality() ? ADTaskType.HISTORICAL_HC_DETECTOR : ADTaskType.HISTORICAL_SINGLE_ENTITY;
}
}

Expand Down Expand Up @@ -1139,7 +1139,7 @@ private <T> void resetHistoricalDetectorTaskState(
if (taskStopped) {
logger.debug("Reset task state as stopped, task id: {}", adTask.getTaskId());
if (taskProfile.getTaskId() == null // This means coordinating node doesn't have HC detector cache
&& detector.isHC()
&& detector.isHighCardinality()
&& !isNullOrEmpty(taskProfile.getEntityTaskProfiles())) {
// If coordinating node restarted, HC detector cache on it will be gone. But worker node still
// runs entity tasks, we'd better stop these entity tasks to clean up resource earlier.
Expand Down Expand Up @@ -1198,11 +1198,11 @@ private boolean isTaskStopped(String taskId, AnomalyDetector detector, ADTaskPro
// If no node is running this task, reset it as STOPPED.
return true;
}
if (!detector.isHC() && taskProfile.getNodeId() == null) {
if (!detector.isHighCardinality() && taskProfile.getNodeId() == null) {
logger.debug("AD task not running for single entity detector {}, task {}", detectorId, taskId);
return true;
}
if (detector.isHC()
if (detector.isHighCardinality()
&& taskProfile.getTotalEntitiesInited()
&& isNullOrEmpty(taskProfile.getRunningEntities())
&& isNullOrEmpty(taskProfile.getEntityTaskProfiles())
Expand Down Expand Up @@ -2844,7 +2844,7 @@ public String convertEntityToString(Entity entity, AnomalyDetector detector) {
throw new TimeSeriesException(error);
}
}
if (detector.isHC()) {
if (detector.isHighCardinality()) {
String categoryField = detector.getCategoryFields().get(0);
return entity.getAttributes().get(categoryField);
}
Expand All @@ -2871,7 +2871,7 @@ public Entity parseEntityFromString(String entityValue, ADTask adTask) {
logger.debug(error, e);
throw new TimeSeriesException(error);
}
} else if (detector.isHC()) {
} else if (detector.isHighCardinality()) {
return Entity.createSingleAttributeEntity(detector.getCategoryFields().get(0), entityValue);
}
throw new IllegalArgumentException("Fail to parse to Entity for single flow detector");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private ActionListener<Optional<AnomalyDetector>> onGetDetector(
}

AnomalyDetector anomalyDetector = detectorOptional.get();
if (anomalyDetector.isHC()) {
if (anomalyDetector.isHighCardinality()) {
hcDetectors.add(adID);
adStats.getStat(StatNames.AD_HC_EXECUTE_REQUEST_COUNT.getName()).increment();
}
Expand Down Expand Up @@ -445,7 +445,7 @@ private void executeAnomalyDetection(
long dataEndTime
) {
// HC logic starts here
if (anomalyDetector.isHC()) {
if (anomalyDetector.isHighCardinality()) {
Optional<Exception> previousException = stateManager.fetchExceptionAndClear(adID);
if (previousException.isPresent()) {
Exception exception = previousException.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener
case NEXT_ENTITY:
logger.debug("Received NEXT_ENTITY action for detector {}, task {}", detectorId, adTask.getTaskId());
// Run next entity for HC detector historical analysis.
if (detector.isHC()) { // AD task could be HC detector level task or entity task
if (detector.isHighCardinality()) { // AD task could be HC detector level task or entity task
adTaskCacheManager.removeRunningEntity(detectorId, entityValue);
if (!adTaskCacheManager.hasEntity(detectorId)) {
adTaskCacheManager.setDetectorTaskSlots(detectorId, 0);
Expand Down Expand Up @@ -200,7 +200,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener
// Cancel HC detector's historical analysis.
// Don't support single detector for this action as single entity task will be cancelled directly
// on worker node.
if (detector.isHC()) {
if (detector.isHighCardinality()) {
adTaskCacheManager.clearPendingEntities(detectorId);
adTaskCacheManager.removeRunningEntity(detectorId, entityValue);
if (!adTaskCacheManager.hasEntity(detectorId) || !adTask.isEntityTask()) {
Expand Down
Loading

0 comments on commit b14bd8c

Please sign in to comment.