From 357072543e443d96af0fa9fa8ac99f6d4f6ed988 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Sat, 12 Oct 2024 15:27:57 +0800 Subject: [PATCH] Pipe: omit region id when marking collect invocation count for remaining time calculation (#13673) --- .../task/stage/PipeTaskProcessorStage.java | 2 +- .../processor/PipeProcessorSubtask.java | 20 +++++++++++-------- .../PipeProcessorSubtaskExecutorTest.java | 2 +- .../task/progress/PipeEventCommitManager.java | 3 +-- .../commons/pipe/event/EnrichedEvent.java | 7 +++++++ 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java index 7767792ce52d..22cd5ce3094b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java @@ -104,8 +104,8 @@ public PipeTaskProcessorStage( this.pipeProcessorSubtask = new PipeProcessorSubtask( taskId, - creationTime, pipeName, + creationTime, regionId, pipeExtractorInputEventSupplier, pipeProcessor, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index be7e23bed0bd..2b44a34ae99c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -57,33 +57,35 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { private static final AtomicReference subtaskWorkerManager = new AtomicReference<>(); - private final EventSupplier inputEventSupplier; - private final PipeProcessor pipeProcessor; - private final PipeEventCollector outputEventCollector; - // Record these variables to provide corresponding value to tag key of monitoring metrics private final String pipeName; + private final String pipeNameWithCreationTime; // cache for better performance private final int regionId; + private final EventSupplier inputEventSupplier; + private final PipeProcessor pipeProcessor; + private final PipeEventCollector outputEventCollector; + // This variable is used to distinguish between old and new subtasks before and after stuck // restart. private final long subtaskCreationTime; public PipeProcessorSubtask( final String taskID, - final long creationTime, final String pipeName, + final long creationTime, final int regionId, final EventSupplier inputEventSupplier, final PipeProcessor pipeProcessor, final PipeEventCollector outputEventCollector) { super(taskID, creationTime); - this.subtaskCreationTime = System.currentTimeMillis(); this.pipeName = pipeName; + this.pipeNameWithCreationTime = pipeName + "_" + creationTime; this.regionId = regionId; this.inputEventSupplier = inputEventSupplier; this.pipeProcessor = pipeProcessor; this.outputEventCollector = outputEventCollector; + this.subtaskCreationTime = System.currentTimeMillis(); // Only register dataRegions if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) { @@ -137,12 +139,14 @@ protected boolean executeOnce() throws Exception { pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTabletEvent(taskID); PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount()); + .markCollectInvocationCount( + pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof TsFileInsertionEvent) { pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTsFileEvent(taskID); PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount()); + .markCollectInvocationCount( + pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof PipeHeartbeatEvent) { pipeProcessor.process(event, outputEventCollector); ((PipeHeartbeatEvent) event).onProcessed(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java index 9b001f66e7fd..4d3cf4c49d83 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java @@ -40,8 +40,8 @@ public void setUp() throws Exception { Mockito.spy( new PipeProcessorSubtask( "PipeProcessorSubtaskExecutorTest", - System.currentTimeMillis(), "TestPipe", + System.currentTimeMillis(), 0, mock(EventSupplier.class), mock(PipeProcessor.class), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index 4d70679c6fef..c429f3155c00 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -104,8 +104,7 @@ public void commit(final EnrichedEvent event, final CommitterKey committerKey) { } if (Objects.nonNull(commitRateMarker)) { try { - commitRateMarker.accept( - event.getPipeName() + '_' + event.getCreationTime(), event.isDataRegionEvent()); + commitRateMarker.accept(event.getPipeNameWithCreationTime(), event.isDataRegionEvent()); } catch (final Exception e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index afed8d5f3bb7..ee70ff5286c5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -54,6 +54,8 @@ public abstract class EnrichedEvent implements Event { protected final String pipeName; protected final long creationTime; + private final String pipeNameWithCreationTime; // cache for better performance + protected final PipeTaskMeta pipeTaskMeta; protected CommitterKey committerKey; @@ -86,6 +88,7 @@ protected EnrichedEvent( this.pipeName = pipeName; this.creationTime = creationTime; + this.pipeNameWithCreationTime = pipeName + "_" + creationTime; this.pipeTaskMeta = pipeTaskMeta; this.treePattern = treePattern; this.tablePattern = tablePattern; @@ -302,6 +305,10 @@ public final long getCreationTime() { return creationTime; } + public String getPipeNameWithCreationTime() { + return pipeNameWithCreationTime; + } + public final int getRegionId() { // TODO: persist regionId in EnrichedEvent return committerKey == null ? -1 : committerKey.getRegionId();