From cfb7e5301928d8981d192a75d3d25f6f8e58620a Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 26 Sep 2024 17:58:34 +0800 Subject: [PATCH 1/7] setup --- .../event/common/tablet/PipeRawTabletInsertionEvent.java | 7 +++++++ .../pipe/agent/task/progress/PipeEventCommitManager.java | 2 +- .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 4 ++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 1b79b9f27d58..2398fdca42e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -190,6 +190,13 @@ public boolean isGeneratedByPipe() { throw new UnsupportedOperationException("isGeneratedByPipe() is not supported!"); } + @Override + public boolean needToCommitRate() { + // When computing the commit rate, only consider events where needToReport is true to avoid + // counting unparsed source events that influence remaining time calculation. + return needToReport; + } + @Override public boolean mayEventTimeOverlappedWithTimeRange() { final long[] timestamps = tablet.timestamps; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index 4d70679c6fef..cb8e5b063572 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -102,7 +102,7 @@ public void commit(final EnrichedEvent event, final CommitterKey committerKey) { || event.getCreationTime() == 0) { return; } - if (Objects.nonNull(commitRateMarker)) { + if (Objects.nonNull(commitRateMarker) && event.needToCommitRate()) { try { commitRateMarker.accept( event.getPipeName() + '_' + event.getCreationTime(), event.isDataRegionEvent()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 39c1174f8ef0..2db00755c218 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -365,6 +365,10 @@ public boolean needToCommit() { return true; } + public boolean needToCommitRate() { + return true; + } + public abstract boolean mayEventTimeOverlappedWithTimeRange(); public abstract boolean mayEventPathsOverlappedWithPattern(); From a995a80004a778ce587f63f18516b9a065a2a4f3 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 26 Sep 2024 23:04:28 +0800 Subject: [PATCH 2/7] improve --- .../connector/PipeConnectorSubtask.java | 5 ++-- ...DataNodeRemainingEventAndTimeOperator.java | 4 +-- .../agent/SubscriptionBrokerAgent.java | 5 ++-- .../SubscriptionBlockingPendingQueue.java | 7 +++++ .../broker/SubscriptionBroker.java | 4 +-- .../broker/SubscriptionPrefetchingQueue.java | 30 +++++++++++++++---- .../subscription/event/SubscriptionEvent.java | 4 +-- .../batch/SubscriptionPipeEventBatch.java | 9 ++++++ .../SubscriptionPipeTabletEventBatch.java | 7 ----- .../SubscriptionPipeTsFileEventBatch.java | 8 ----- .../pipe/SubscriptionPipeEmptyEvent.java | 2 +- .../event/pipe/SubscriptionPipeEvents.java | 2 +- .../SubscriptionPipeTabletBatchEvents.java | 4 +-- .../SubscriptionPipeTsFileBatchEvents.java | 4 +-- .../SubscriptionPipeTsFilePlainEvent.java | 2 +- .../subtask/SubscriptionConnectorSubtask.java | 9 ++---- 16 files changed, 62 insertions(+), 44 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java index 3f5529b02a16..943d078e5e12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java @@ -297,13 +297,14 @@ public int getAsyncConnectorRetryEventQueueSize() { // For performance, this will not acquire lock and does not guarantee the correct // result. However, this shall not cause any exceptions when concurrently read & written. - public int getEventCount(final String pipeName) { + public int getEventCount(final String pipeName, final boolean forCommitRate) { final AtomicInteger count = new AtomicInteger(0); try { inputPendingQueue.forEach( event -> { if (event instanceof EnrichedEvent - && pipeName.equals(((EnrichedEvent) event).getPipeName())) { + && pipeName.equals(((EnrichedEvent) event).getPipeName()) + && (!forCommitRate || ((EnrichedEvent) event).needToCommitRate())) { count.incrementAndGet(); } }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index bf0c4ba2386e..b6995e519797 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -69,7 +69,7 @@ long getRemainingEvents() { .reduce(Integer::sum) .orElse(0) + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName)) + .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName, false)) .reduce(Integer::sum) .orElse(0) + schemaRegionExtractors.stream() @@ -106,7 +106,7 @@ long getRemainingEvents() { .reduce(Integer::sum) .orElse(0) + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName)) + .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName, true)) .reduce(Integer::sum) .orElse(0) - dataRegionConnectors.stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 13770fdbf7c2..8097422af2e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -204,13 +204,14 @@ public void executePrefetch(final String consumerGroupId, final String topicName broker.executePrefetch(topicName); } - public int getPipeEventCount(final String consumerGroupId, final String topicName) { + public int getPipeEventCount( + final String consumerGroupId, final String topicName, final boolean forCommitRate) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { LOGGER.warn( "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return 0; } - return broker.getPipeEventCount(topicName); + return broker.getPipeEventCount(topicName, forCommitRate); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java index df3791cc3270..31b2d81815a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.event.Event; +import java.util.function.Consumer; + public abstract class SubscriptionBlockingPendingQueue { protected final UnboundedBlockingPendingQueue inputPendingQueue; @@ -40,4 +42,9 @@ public int size() { public boolean isEmpty() { return inputPendingQueue.isEmpty(); } + + /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ + public void forEach(final Consumer action) { + inputPendingQueue.forEach(action); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index 420b571103a8..4129f4ff85b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -369,7 +369,7 @@ public void executePrefetch(final String topicName) { prefetchingQueue.executePrefetch(); } - public int getPipeEventCount(final String topicName) { + public int getPipeEventCount(final String topicName, final boolean forCommitRate) { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { @@ -386,6 +386,6 @@ public int getPipeEventCount(final String topicName) { brokerId); return 0; } - return prefetchingQueue.getPipeEventCount(); + return prefetchingQueue.getPipeEventCount(forCommitRate); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index ac5b3fad279c..416f4ca3b3eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; +import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -48,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -539,14 +541,32 @@ public long getCurrentCommitId() { return commitIdGenerator.get(); } - public int getPipeEventCount() { - return inputPendingQueue.size() + public int getPipeEventCount(final boolean forCommitRate) { + final AtomicInteger count = new AtomicInteger(0); + try { + inputPendingQueue.forEach( + event -> { + if (event instanceof EnrichedEvent + && (!forCommitRate || ((EnrichedEvent) event).needToCommitRate())) { + count.incrementAndGet(); + } + }); + } catch (final Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Exception occurred when counting event of input pending queue of {}, root cause: {}", + this, + ErrorHandlingUtils.getRootCause(e).getMessage(), + e); + } + } + return count.get() + prefetchingQueue.stream() - .map(SubscriptionEvent::getPipeEventCount) + .map(event -> event.getPipeEventCount(forCommitRate)) .reduce(Integer::sum) .orElse(0) - + +inFlightEvents.values().stream() - .map(SubscriptionEvent::getPipeEventCount) + + inFlightEvents.values().stream() + .map(event -> event.getPipeEventCount(forCommitRate)) .reduce(Integer::sum) .orElse(0); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index ad274023f6a8..4ce7f8bf66ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -417,8 +417,8 @@ public String getFileName() { /////////////////////////////// APIs provided for metric framework /////////////////////////////// - public int getPipeEventCount() { - return pipeEvents.getPipeEventCount(); + public int getPipeEventCount(final boolean forCommitRate) { + return pipeEvents.getPipeEventCount(forCommitRate); } /////////////////////////////// object /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java index 758d9b315675..a6af831a903b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java @@ -25,6 +25,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ public abstract class SubscriptionPipeEventBatch { protected final int maxDelayInMs; protected final long maxBatchSizeInBytes; + protected final List enrichedEvents = new ArrayList<>(); protected volatile List events = null; protected SubscriptionPipeEventBatch( @@ -84,4 +86,11 @@ protected Map coreReportMessage() { result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes)); return result; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + public int getPipeEventCount(final boolean forCommitRate) { + return (int) + enrichedEvents.stream().filter(event -> !forCommitRate || event.needToCommitRate()).count(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 871b1c12587f..e9776307ef4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -54,7 +54,6 @@ public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch private static final long READ_TABLET_BUFFER_SIZE = SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize(); - private final List enrichedEvents = new ArrayList<>(); private final List tablets = new ArrayList<>(); private long firstEventProcessingTime = Long.MIN_VALUE; @@ -223,10 +222,4 @@ private static String formatEnrichedEvents( } return eventMessageList.toString(); } - - //////////////////////////// APIs provided for metric framework //////////////////////////// - - public int getPipeEventCount() { - return enrichedEvents.size(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index e38887e19f2f..a70d8948ddfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -43,7 +43,6 @@ public class SubscriptionPipeTsFileEventBatch extends SubscriptionPipeEventBatch { private final PipeTabletEventTsFileBatch batch; - private final List enrichedEvents; public SubscriptionPipeTsFileEventBatch( final int regionId, @@ -52,7 +51,6 @@ public SubscriptionPipeTsFileEventBatch( final long maxBatchSizeInBytes) { super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes); this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, maxBatchSizeInBytes); - this.enrichedEvents = new ArrayList<>(); } @Override @@ -132,10 +130,4 @@ protected Map coreReportMessage() { coreReportMessage.put("batch", batch.toString()); return coreReportMessage; } - - //////////////////////////// APIs provided for metric framework //////////////////////////// - - public int getPipeEventCount() { - return enrichedEvents.size(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java index 3e74e03d8f4c..1077830489cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java @@ -44,7 +44,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount() { + public int getPipeEventCount(final boolean forCommitRate) { return 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java index 489c4cf8120c..0658ffaaa078 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java @@ -34,5 +34,5 @@ public interface SubscriptionPipeEvents { //////////////////////////// APIs provided for metric framework //////////////////////////// - int getPipeEventCount(); + int getPipeEventCount(final boolean forCommitRate); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java index 226367405eba..ea82dc21f1e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java @@ -56,7 +56,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount() { - return batch.getPipeEventCount(); + public int getPipeEventCount(final boolean forCommitRate) { + return batch.getPipeEventCount(forCommitRate); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java index 5cae21f5ac8a..db77b49de411 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java @@ -76,9 +76,9 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount() { + public int getPipeEventCount(final boolean forCommitRate) { // Since multiple events will share the same batch, equal division is performed here. // If it is not exact, round up to remain pessimistic. - return (batch.getPipeEventCount() + count - 1) / count; + return (batch.getPipeEventCount(forCommitRate) + count - 1) / count; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java index 111006fa6d32..09707a24fbfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java @@ -59,7 +59,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount() { + public int getPipeEventCount(final boolean forCommitRate) { return 1; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java index 3e540254d8e7..c94f1a7533c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java @@ -25,13 +25,8 @@ import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class SubscriptionConnectorSubtask extends PipeConnectorSubtask { - private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtask.class); - private final String topicName; private final String consumerGroupId; @@ -81,9 +76,9 @@ public UnboundedBlockingPendingQueue getInputPendingQueue() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getEventCount(final String pipeName) { + public int getEventCount(final String pipeName, final boolean forCommitRate) { // count the number of pipe events in sink queue and prefetching queue, note that can safely // ignore lastEvent - return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName); + return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName, forCommitRate); } } From 85f2a7d9339c1a6e6bfa90e5ed94e3fcb05d9bee Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 26 Sep 2024 23:06:29 +0800 Subject: [PATCH 3/7] minor improve --- .../event/pipe/SubscriptionPipeTsFilePlainEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java index 09707a24fbfd..1153bd65ace6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java @@ -60,6 +60,6 @@ public String toString() { @Override public int getPipeEventCount(final boolean forCommitRate) { - return 1; + return forCommitRate ? (tsFileInsertionEvent.needToCommitRate() ? 1 : 0) : 1; } } From 3ed6907cce202680ec072085e8e7cf1ae7e31bd9 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Fri, 27 Sep 2024 14:35:34 +0800 Subject: [PATCH 4/7] rename to forRemainingTime --- .../task/subtask/connector/PipeConnectorSubtask.java | 4 ++-- .../db/subscription/agent/SubscriptionBrokerAgent.java | 4 ++-- .../iotdb/db/subscription/broker/SubscriptionBroker.java | 4 ++-- .../subscription/broker/SubscriptionPrefetchingQueue.java | 8 ++++---- .../iotdb/db/subscription/event/SubscriptionEvent.java | 4 ++-- .../event/batch/SubscriptionPipeEventBatch.java | 6 ++++-- .../event/pipe/SubscriptionPipeEmptyEvent.java | 2 +- .../subscription/event/pipe/SubscriptionPipeEvents.java | 2 +- .../event/pipe/SubscriptionPipeTabletBatchEvents.java | 4 ++-- .../event/pipe/SubscriptionPipeTsFileBatchEvents.java | 4 ++-- .../event/pipe/SubscriptionPipeTsFilePlainEvent.java | 4 ++-- .../task/subtask/SubscriptionConnectorSubtask.java | 5 +++-- 12 files changed, 27 insertions(+), 24 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java index 943d078e5e12..a87809e3ce8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java @@ -297,14 +297,14 @@ public int getAsyncConnectorRetryEventQueueSize() { // For performance, this will not acquire lock and does not guarantee the correct // result. However, this shall not cause any exceptions when concurrently read & written. - public int getEventCount(final String pipeName, final boolean forCommitRate) { + public int getEventCount(final String pipeName, final boolean forRemainingTime) { final AtomicInteger count = new AtomicInteger(0); try { inputPendingQueue.forEach( event -> { if (event instanceof EnrichedEvent && pipeName.equals(((EnrichedEvent) event).getPipeName()) - && (!forCommitRate || ((EnrichedEvent) event).needToCommitRate())) { + && (!forRemainingTime || ((EnrichedEvent) event).needToCommitRate())) { count.incrementAndGet(); } }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 8097422af2e7..091a7fdf3d72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -205,13 +205,13 @@ public void executePrefetch(final String consumerGroupId, final String topicName } public int getPipeEventCount( - final String consumerGroupId, final String topicName, final boolean forCommitRate) { + final String consumerGroupId, final String topicName, final boolean forRemainingTime) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { LOGGER.warn( "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return 0; } - return broker.getPipeEventCount(topicName, forCommitRate); + return broker.getPipeEventCount(topicName, forRemainingTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index 4129f4ff85b1..a0ae4d0118a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -369,7 +369,7 @@ public void executePrefetch(final String topicName) { prefetchingQueue.executePrefetch(); } - public int getPipeEventCount(final String topicName, final boolean forCommitRate) { + public int getPipeEventCount(final String topicName, final boolean forRemainingTime) { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { @@ -386,6 +386,6 @@ public int getPipeEventCount(final String topicName, final boolean forCommitRate brokerId); return 0; } - return prefetchingQueue.getPipeEventCount(forCommitRate); + return prefetchingQueue.getPipeEventCount(forRemainingTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 416f4ca3b3eb..0595a23a1dc0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -541,13 +541,13 @@ public long getCurrentCommitId() { return commitIdGenerator.get(); } - public int getPipeEventCount(final boolean forCommitRate) { + public int getPipeEventCount(final boolean forRemainingTime) { final AtomicInteger count = new AtomicInteger(0); try { inputPendingQueue.forEach( event -> { if (event instanceof EnrichedEvent - && (!forCommitRate || ((EnrichedEvent) event).needToCommitRate())) { + && (!forRemainingTime || ((EnrichedEvent) event).needToCommitRate())) { count.incrementAndGet(); } }); @@ -562,11 +562,11 @@ public int getPipeEventCount(final boolean forCommitRate) { } return count.get() + prefetchingQueue.stream() - .map(event -> event.getPipeEventCount(forCommitRate)) + .map(event -> event.getPipeEventCount(forRemainingTime)) .reduce(Integer::sum) .orElse(0) + inFlightEvents.values().stream() - .map(event -> event.getPipeEventCount(forCommitRate)) + .map(event -> event.getPipeEventCount(forRemainingTime)) .reduce(Integer::sum) .orElse(0); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index 4ce7f8bf66ef..3bb95bc77422 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -417,8 +417,8 @@ public String getFileName() { /////////////////////////////// APIs provided for metric framework /////////////////////////////// - public int getPipeEventCount(final boolean forCommitRate) { - return pipeEvents.getPipeEventCount(forCommitRate); + public int getPipeEventCount(final boolean forRemainingTime) { + return pipeEvents.getPipeEventCount(forRemainingTime); } /////////////////////////////// object /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java index a6af831a903b..018c0eb781c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java @@ -89,8 +89,10 @@ protected Map coreReportMessage() { //////////////////////////// APIs provided for metric framework //////////////////////////// - public int getPipeEventCount(final boolean forCommitRate) { + public int getPipeEventCount(final boolean forRemainingTime) { return (int) - enrichedEvents.stream().filter(event -> !forCommitRate || event.needToCommitRate()).count(); + enrichedEvents.stream() + .filter(event -> !forRemainingTime || event.needToCommitRate()) + .count(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java index 1077830489cb..8dcf673e2aa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java @@ -44,7 +44,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forCommitRate) { + public int getPipeEventCount(final boolean forRemainingTime) { return 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java index 0658ffaaa078..6e772bd075af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java @@ -34,5 +34,5 @@ public interface SubscriptionPipeEvents { //////////////////////////// APIs provided for metric framework //////////////////////////// - int getPipeEventCount(final boolean forCommitRate); + int getPipeEventCount(final boolean forRemainingTime); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java index ea82dc21f1e5..ea96ca5ebc5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java @@ -56,7 +56,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forCommitRate) { - return batch.getPipeEventCount(forCommitRate); + public int getPipeEventCount(final boolean forRemainingTime) { + return batch.getPipeEventCount(forRemainingTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java index db77b49de411..1f838fbba5f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java @@ -76,9 +76,9 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forCommitRate) { + public int getPipeEventCount(final boolean forRemainingTime) { // Since multiple events will share the same batch, equal division is performed here. // If it is not exact, round up to remain pessimistic. - return (batch.getPipeEventCount(forCommitRate) + count - 1) / count; + return (batch.getPipeEventCount(forRemainingTime) + count - 1) / count; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java index 1153bd65ace6..7fab066ce1cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java @@ -59,7 +59,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forCommitRate) { - return forCommitRate ? (tsFileInsertionEvent.needToCommitRate() ? 1 : 0) : 1; + public int getPipeEventCount(final boolean forRemainingTime) { + return forRemainingTime ? (tsFileInsertionEvent.needToCommitRate() ? 1 : 0) : 1; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java index c94f1a7533c1..027423683090 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java @@ -76,9 +76,10 @@ public UnboundedBlockingPendingQueue getInputPendingQueue() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getEventCount(final String pipeName, final boolean forCommitRate) { + public int getEventCount(final String pipeName, final boolean forRemainingTime) { // count the number of pipe events in sink queue and prefetching queue, note that can safely // ignore lastEvent - return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName, forCommitRate); + return SubscriptionAgent.broker() + .getPipeEventCount(consumerGroupId, topicName, forRemainingTime); } } From c03921dc10123acd3326dc67e30191294398c390 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 1 Oct 2024 17:13:26 +0800 Subject: [PATCH 5/7] Use predicate unified interface & remove marking collect invocation count --- .../connector/PipeConnectorSubtask.java | 40 +++++------ .../processor/PipeProcessorSubtask.java | 11 +--- .../async/IoTDBDataRegionAsyncConnector.java | 25 +++---- .../tablet/PipeRawTabletInsertionEvent.java | 2 +- ...eDataNodeRemainingEventAndTimeMetrics.java | 13 ---- ...DataNodeRemainingEventAndTimeOperator.java | 66 +++++++++++-------- .../agent/SubscriptionBrokerAgent.java | 8 ++- .../broker/SubscriptionBroker.java | 6 +- .../broker/SubscriptionPrefetchingQueue.java | 40 ++++------- .../subscription/event/SubscriptionEvent.java | 6 +- .../batch/SubscriptionPipeEventBatch.java | 8 +-- .../pipe/SubscriptionPipeEmptyEvent.java | 5 +- .../event/pipe/SubscriptionPipeEvents.java | 5 +- .../SubscriptionPipeTabletBatchEvents.java | 6 +- .../SubscriptionPipeTsFileBatchEvents.java | 6 +- .../SubscriptionPipeTsFilePlainEvent.java | 6 +- .../subtask/SubscriptionConnectorSubtask.java | 8 ++- .../commons/pipe/event/EnrichedEvent.java | 2 +- 18 files changed, 122 insertions(+), 141 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java index 46ccf85c2b42..dfe496f8920f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java @@ -46,6 +46,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { @@ -297,34 +298,23 @@ public int getAsyncConnectorRetryEventQueueSize() { // For performance, this will not acquire lock and does not guarantee the correct // result. However, this shall not cause any exceptions when concurrently read & written. - public int getEventCount(final String pipeName, final boolean forRemainingTime) { - final AtomicInteger count = new AtomicInteger(0); - try { - inputPendingQueue.forEach( - event -> { - if (event instanceof EnrichedEvent - && pipeName.equals(((EnrichedEvent) event).getPipeName()) - && (!forRemainingTime || ((EnrichedEvent) event).needToCommitRate())) { - count.incrementAndGet(); - } - }); - } catch (final Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Exception occurred when counting event of pipe {}, root cause: {}", - pipeName, - ErrorHandlingUtils.getRootCause(e).getMessage(), - e); - } - } + public int getEventCount(final Predicate predicate) { + final AtomicInteger inputPendingQueuePipeEventCount = new AtomicInteger(0); + inputPendingQueue.forEach( + event -> { + if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) { + inputPendingQueuePipeEventCount.incrementAndGet(); + } + }); + final int retryEventQueuePipeEventCount = + outputPipeConnector instanceof IoTDBDataRegionAsyncConnector + ? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(predicate) + : 0; // Avoid potential NPE in "getPipeName" final EnrichedEvent event = lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null; - return count.get() - + (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector - ? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(pipeName) - : 0) - + (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1 : 0); + final int lastEventCount = (Objects.nonNull(event) && predicate.test(event)) ? 1 : 0; + return inputPendingQueuePipeEventCount.get() + retryEventQueuePipeEventCount + lastEventCount; } //////////////////////////// Error report //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index be7e23bed0bd..8e805ff5d83d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -49,6 +49,7 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; public class PipeProcessorSubtask extends PipeReportableSubtask { @@ -136,13 +137,9 @@ protected boolean executeOnce() throws Exception { if (event instanceof TabletInsertionEvent) { pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTabletEvent(taskID); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof TsFileInsertionEvent) { pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTsFileEvent(taskID); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof PipeHeartbeatEvent) { pipeProcessor.process(event, outputEventCollector); ((PipeHeartbeatEvent) event).onProcessed(); @@ -276,13 +273,11 @@ public int getRegionId() { return regionId; } - public int getEventCount(final boolean ignoreHeartbeat) { + public int getEventCount(final Predicate predicate) { // Avoid potential NPE in "getPipeName" final EnrichedEvent event = lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null; - return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof PipeHeartbeatEvent) - ? 1 - : 0; + return (Objects.nonNull(event) && predicate.test(event)) ? 1 : 0; } //////////////////////////// Error report //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 5aa4324a2f39..b981aee0fd14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -70,6 +70,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; @@ -559,22 +560,14 @@ public int getRetryEventQueueSize() { // For performance, this will not acquire lock and does not guarantee the correct // result. However, this shall not cause any exceptions when concurrently read & written. - public int getRetryEventCount(final String pipeName) { + public int getRetryEventCount(final Predicate predicate) { final AtomicInteger count = new AtomicInteger(0); - try { - retryEventQueue.forEach( - event -> { - if (event instanceof EnrichedEvent - && pipeName.equals(((EnrichedEvent) event).getPipeName())) { - count.incrementAndGet(); - } - }); - return count.get(); - } catch (final Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Failed to get retry event count for pipe {}.", pipeName, e); - } - return count.get(); - } + retryEventQueue.forEach( + event -> { + if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) { + count.incrementAndGet(); + } + }); + return count.get(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 2398fdca42e2..70526663980c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -194,7 +194,7 @@ public boolean isGeneratedByPipe() { public boolean needToCommitRate() { // When computing the commit rate, only consider events where needToReport is true to avoid // counting unparsed source events that influence remaining time calculation. - return needToReport; + return super.needToCommitRate() && needToReport; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java index 597c62cb7da1..7bcfdde8704b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java @@ -218,19 +218,6 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) { } } - public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) { - if (Objects.isNull(metricService)) { - return; - } - final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.get(pipeID); - if (Objects.isNull(operator)) { - return; - } - - operator.markCollectInvocationCount(collectInvocationCount); - } - //////////////////////////// Show pipes //////////////////////////// public Pair getRemainingEventAndTime( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index b6995e519797..67c17cead99e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -21,26 +21,32 @@ import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask; import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor; -import org.apache.iotdb.metrics.core.IoTDBMetricManager; -import org.apache.iotdb.metrics.core.type.IoTDBHistogram; import org.apache.iotdb.pipe.api.event.Event; import com.codahale.metrics.Clock; import com.codahale.metrics.ExponentialMovingAverages; import com.codahale.metrics.Meter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeDataNodeRemainingEventAndTimeOperator.class); + private final Set dataRegionExtractors = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set dataRegionProcessors = @@ -51,25 +57,41 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { Collections.newSetFromMap(new ConcurrentHashMap<>()); private final AtomicReference dataRegionCommitMeter = new AtomicReference<>(null); private final AtomicReference schemaRegionCommitMeter = new AtomicReference<>(null); - private final IoTDBHistogram collectInvocationHistogram = - (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null); private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE; private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE; //////////////////////////// Remaining event & time calculation //////////////////////////// + private static Predicate pipeNameFilter(final String pipeName) { + return event -> { + if (Objects.isNull(event)) { + return false; + } + return Objects.equals(event.getPipeName(), pipeName); + }; + } + + private static Predicate pipeNameAndCommitRateFilter(final String pipeName) { + return event -> { + if (Objects.isNull(event)) { + return false; + } + return Objects.equals(event.getPipeName(), pipeName) && event.needToCommitRate(); + }; + } + long getRemainingEvents() { return dataRegionExtractors.stream() .map(IoTDBDataRegionExtractor::getEventCount) .reduce(Integer::sum) .orElse(0) + dataRegionProcessors.stream() - .map(processorSubtask -> processorSubtask.getEventCount(false)) + .map(processorSubtask -> processorSubtask.getEventCount(pipeNameFilter(pipeName))) .reduce(Integer::sum) .orElse(0) + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName, false)) + .map(connectorSubtask -> connectorSubtask.getEventCount(pipeNameFilter(pipeName))) .reduce(Integer::sum) .orElse(0) + schemaRegionExtractors.stream() @@ -89,28 +111,21 @@ long getRemainingEvents() { final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime = PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime(); - final double invocationValue = collectInvocationHistogram.getMean(); - // Do not take heartbeat event into account final double totalDataRegionWriteEventCount = - (dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getEventCount) - .reduce(Integer::sum) - .orElse(0) - - dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount) - .reduce(Integer::sum) - .orElse(0)) - * Math.max(invocationValue, 1) - + dataRegionProcessors.stream() - .map(processorSubtask -> processorSubtask.getEventCount(true)) + dataRegionExtractors.stream() + .map(IoTDBDataRegionExtractor::getEventCount) .reduce(Integer::sum) .orElse(0) - + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName, true)) + + dataRegionProcessors.stream() + .map( + processorSubtask -> + processorSubtask.getEventCount(pipeNameAndCommitRateFilter(pipeName))) .reduce(Integer::sum) .orElse(0) - - dataRegionConnectors.stream() - .map(PipeConnectorSubtask::getPipeHeartbeatEventCount) + + dataRegionConnectors.stream() + .map( + connectorSubtask -> + connectorSubtask.getEventCount(pipeNameAndCommitRateFilter(pipeName))) .reduce(Integer::sum) .orElse(0); @@ -211,11 +226,6 @@ void markSchemaRegionCommit() { }); } - void markCollectInvocationCount(final long collectInvocationCount) { - // If collectInvocationCount == 0, the event will still be committed once - collectInvocationHistogram.update(Math.max(collectInvocationCount, 1)); - } - //////////////////////////// Switch //////////////////////////// // Thread-safe & Idempotent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 091a7fdf3d72..525e0dd1af59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.subscription.agent; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMetaKeeper; import org.apache.iotdb.db.subscription.broker.SubscriptionBroker; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -35,6 +36,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; public class SubscriptionBrokerAgent { @@ -205,13 +207,15 @@ public void executePrefetch(final String consumerGroupId, final String topicName } public int getPipeEventCount( - final String consumerGroupId, final String topicName, final boolean forRemainingTime) { + final String consumerGroupId, + final String topicName, + final Predicate predicate) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { LOGGER.warn( "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return 0; } - return broker.getPipeEventCount(topicName, forRemainingTime); + return broker.getPipeEventCount(topicName, predicate); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index a0ae4d0118a3..db9c15b0f932 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.subscription.broker; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -49,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; @@ -369,7 +371,7 @@ public void executePrefetch(final String topicName) { prefetchingQueue.executePrefetch(); } - public int getPipeEventCount(final String topicName, final boolean forRemainingTime) { + public int getPipeEventCount(final String topicName, final Predicate predicate) { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { @@ -386,6 +388,6 @@ public int getPipeEventCount(final String topicName, final boolean forRemainingT brokerId); return 0; } - return prefetchingQueue.getPipeEventCount(forRemainingTime); + return prefetchingQueue.getPipeEventCount(predicate); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 0595a23a1dc0..5ca05844296b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -29,7 +29,6 @@ import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; -import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -52,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; @@ -541,34 +541,20 @@ public long getCurrentCommitId() { return commitIdGenerator.get(); } - public int getPipeEventCount(final boolean forRemainingTime) { - final AtomicInteger count = new AtomicInteger(0); - try { - inputPendingQueue.forEach( - event -> { - if (event instanceof EnrichedEvent - && (!forRemainingTime || ((EnrichedEvent) event).needToCommitRate())) { - count.incrementAndGet(); - } - }); - } catch (final Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Exception occurred when counting event of input pending queue of {}, root cause: {}", - this, - ErrorHandlingUtils.getRootCause(e).getMessage(), - e); - } - } - return count.get() - + prefetchingQueue.stream() - .map(event -> event.getPipeEventCount(forRemainingTime)) - .reduce(Integer::sum) - .orElse(0) - + inFlightEvents.values().stream() - .map(event -> event.getPipeEventCount(forRemainingTime)) + public int getPipeEventCount(final Predicate predicate) { + final AtomicInteger inputPendingQueuePipeEventCount = new AtomicInteger(0); + inputPendingQueue.forEach( + event -> { + if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) { + inputPendingQueuePipeEventCount.incrementAndGet(); + } + }); + final int prefetchingQueuePipeEventCount = + prefetchingQueue.stream() + .map(event -> event.getPipeEventCount(predicate)) .reduce(Integer::sum) .orElse(0); + return inputPendingQueuePipeEventCount.get() + prefetchingQueuePipeEventCount; } /////////////////////////////// close & termination /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index 3bb95bc77422..77a596a7defe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.subscription.event; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEvents; @@ -47,6 +48,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; @@ -417,8 +419,8 @@ public String getFileName() { /////////////////////////////// APIs provided for metric framework /////////////////////////////// - public int getPipeEventCount(final boolean forRemainingTime) { - return pipeEvents.getPipeEventCount(forRemainingTime); + public int getPipeEventCount(final Predicate predicate) { + return pipeEvents.getPipeEventCount(predicate); } /////////////////////////////// object /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java index 018c0eb781c4..a8e959d6695b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; +import java.util.function.Predicate; public abstract class SubscriptionPipeEventBatch { @@ -89,10 +90,7 @@ protected Map coreReportMessage() { //////////////////////////// APIs provided for metric framework //////////////////////////// - public int getPipeEventCount(final boolean forRemainingTime) { - return (int) - enrichedEvents.stream() - .filter(event -> !forRemainingTime || event.needToCommitRate()) - .count(); + public int getPipeEventCount(final Predicate predicate) { + return (int) enrichedEvents.stream().filter(predicate).count(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java index 8dcf673e2aa8..8b1a3f80fd30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java @@ -19,7 +19,10 @@ package org.apache.iotdb.db.subscription.event.pipe; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; + import java.io.File; +import java.util.function.Predicate; public class SubscriptionPipeEmptyEvent implements SubscriptionPipeEvents { @@ -44,7 +47,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forRemainingTime) { + public int getPipeEventCount(final Predicate predicate) { return 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java index 6e772bd075af..9100d6d5a10c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java @@ -19,7 +19,10 @@ package org.apache.iotdb.db.subscription.event.pipe; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; + import java.io.File; +import java.util.function.Predicate; public interface SubscriptionPipeEvents { @@ -34,5 +37,5 @@ public interface SubscriptionPipeEvents { //////////////////////////// APIs provided for metric framework //////////////////////////// - int getPipeEventCount(final boolean forRemainingTime); + int getPipeEventCount(final Predicate predicate); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java index ea96ca5ebc5f..30765e652bdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.subscription.event.pipe; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch; import java.io.File; +import java.util.function.Predicate; public class SubscriptionPipeTabletBatchEvents implements SubscriptionPipeEvents { @@ -56,7 +58,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forRemainingTime) { - return batch.getPipeEventCount(forRemainingTime); + public int getPipeEventCount(final Predicate predicate) { + return batch.getPipeEventCount(predicate); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java index 1f838fbba5f3..51d8528866b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.subscription.event.pipe; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch; import java.io.File; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; public class SubscriptionPipeTsFileBatchEvents implements SubscriptionPipeEvents { @@ -76,9 +78,9 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forRemainingTime) { + public int getPipeEventCount(final Predicate predicate) { // Since multiple events will share the same batch, equal division is performed here. // If it is not exact, round up to remain pessimistic. - return (batch.getPipeEventCount(forRemainingTime) + count - 1) / count; + return (batch.getPipeEventCount(predicate) + count - 1) / count; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java index 7fab066ce1cd..8337338ca117 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.subscription.event.pipe; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import java.io.File; +import java.util.function.Predicate; public class SubscriptionPipeTsFilePlainEvent implements SubscriptionPipeEvents { @@ -59,7 +61,7 @@ public String toString() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getPipeEventCount(final boolean forRemainingTime) { - return forRemainingTime ? (tsFileInsertionEvent.needToCommitRate() ? 1 : 0) : 1; + public int getPipeEventCount(final Predicate predicate) { + return predicate.test(tsFileInsertionEvent) ? 1 : 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java index 027423683090..8576723857bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java @@ -20,11 +20,14 @@ package org.apache.iotdb.db.subscription.task.subtask; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; +import java.util.function.Predicate; + public class SubscriptionConnectorSubtask extends PipeConnectorSubtask { private final String topicName; @@ -76,10 +79,9 @@ public UnboundedBlockingPendingQueue getInputPendingQueue() { //////////////////////////// APIs provided for metric framework //////////////////////////// @Override - public int getEventCount(final String pipeName, final boolean forRemainingTime) { + public int getEventCount(final Predicate predicate) { // count the number of pipe events in sink queue and prefetching queue, note that can safely // ignore lastEvent - return SubscriptionAgent.broker() - .getPipeEventCount(consumerGroupId, topicName, forRemainingTime); + return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName, predicate); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 2db00755c218..403734c2f83c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -366,7 +366,7 @@ public boolean needToCommit() { } public boolean needToCommitRate() { - return true; + return this.needToCommit() && shouldReportOnCommit; } public abstract boolean mayEventTimeOverlappedWithTimeRange(); From c8c0dbf9585c272785919d53ae742bda84286963 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 1 Oct 2024 18:13:30 +0800 Subject: [PATCH 6/7] fixup --- iotdb-core/datanode/pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index b85da1476721..222141b8e89d 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -540,6 +540,8 @@ io.jsonwebtoken:jjwt-impl io.jsonwebtoken:jjwt-jackson + + org.apache.iotdb:metrics-core org.glassfish.jersey.inject:jersey-hk2 From 1dc6789d7a198724490318c60ca4a03a4c644c5a Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 1 Oct 2024 19:33:31 +0800 Subject: [PATCH 7/7] Separately count the historical and real-time data of the data region --- .../PipeConfigNodeRemainingTimeMetrics.java | 3 +- .../connector/PipeConnectorSubtask.java | 6 +- .../processor/PipeProcessorSubtask.java | 3 +- .../tablet/PipeRawTabletInsertionEvent.java | 8 + .../tsfile/PipeTsFileInsertionEvent.java | 5 + .../dataregion/IoTDBDataRegionExtractor.java | 8 +- .../PipeRealtimeDataRegionExtractor.java | 13 +- ...eDataNodeRemainingEventAndTimeMetrics.java | 5 +- ...DataNodeRemainingEventAndTimeOperator.java | 166 +++++++++++++----- .../broker/SubscriptionPrefetchingQueue.java | 3 + .../agent/task/progress/CommitRateMarker.java | 26 +++ .../task/progress/PipeEventCommitManager.java | 9 +- .../commons/pipe/event/EnrichedEvent.java | 4 + 13 files changed, 203 insertions(+), 56 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitRateMarker.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java index ab5440eab2db..289caa42158e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java @@ -138,7 +138,8 @@ public void deregister(final String pipeID) { } } - public void markRegionCommit(final String pipeID, final boolean isDataRegion) { + public void markRegionCommit( + final String pipeID, final boolean isDataRegion, final boolean isDataRegionRealtimeEvent) { if (Objects.isNull(metricService)) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java index dfe496f8920f..eb581a9c8252 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java @@ -219,7 +219,7 @@ public void close() { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { + public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { // Try to remove the events as much as possible inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); @@ -299,6 +299,7 @@ public int getAsyncConnectorRetryEventQueueSize() { // For performance, this will not acquire lock and does not guarantee the correct // result. However, this shall not cause any exceptions when concurrently read & written. public int getEventCount(final Predicate predicate) { + // 1. events in inputPendingQueue final AtomicInteger inputPendingQueuePipeEventCount = new AtomicInteger(0); inputPendingQueue.forEach( event -> { @@ -306,11 +307,12 @@ public int getEventCount(final Predicate predicate) { inputPendingQueuePipeEventCount.incrementAndGet(); } }); + // 2. events in specific connector final int retryEventQueuePipeEventCount = outputPipeConnector instanceof IoTDBDataRegionAsyncConnector ? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(predicate) : 0; - // Avoid potential NPE in "getPipeName" + // 3. lastEvent: avoid potential NPE in "getPipeName" final EnrichedEvent event = lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null; final int lastEventCount = (Objects.nonNull(event) && predicate.test(event)) ? 1 : 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 8e805ff5d83d..0845d886b463 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -274,9 +274,10 @@ public int getRegionId() { } public int getEventCount(final Predicate predicate) { - // Avoid potential NPE in "getPipeName" + // lastEvent: avoid potential NPE in "getPipeName" final EnrichedEvent event = lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null; + // TODO: consider events in specific processor return (Objects.nonNull(event) && predicate.test(event)) ? 1 : 0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 70526663980c..cdd409f5a94c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -164,6 +164,14 @@ public ProgressIndex getProgressIndex() { return sourceEvent != null ? sourceEvent.getProgressIndex() : MinimumProgressIndex.INSTANCE; } + @Override + public boolean isDataRegionRealtimeEvent() { + if (!(sourceEvent instanceof PipeTsFileInsertionEvent)) { + return false; + } + return sourceEvent.isDataRegionRealtimeEvent(); + } + @Override public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d026a65e0a81..ee427027678c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -315,6 +315,11 @@ public void eliminateProgressIndex() { } } + @Override + public boolean isDataRegionRealtimeEvent() { + return !isGeneratedByHistoricalExtractor; + } + @Override public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index f05bcf2bea5e..9a6ca7223932 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -53,6 +54,7 @@ import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; @@ -506,9 +508,7 @@ public int getPipeHeartbeatEventCount() { return hasBeenStarted.get() ? realtimeExtractor.getPipeHeartbeatEventCount() : 0; } - public int getEventCount() { - return hasBeenStarted.get() - ? (historicalExtractor.getPendingQueueSize() + realtimeExtractor.getEventCount()) - : 0; + public int getRealtimeEventCount(final Predicate predicate) { + return hasBeenStarted.get() ? realtimeExtractor.getEventCount(predicate) : 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index 40c6bdc906f7..88653328bcb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -57,7 +57,9 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; @@ -520,8 +522,15 @@ public int getPipeHeartbeatEventCount() { return pendingQueue.getPipeHeartbeatEventCount(); } - public int getEventCount() { - return pendingQueue.size(); + public int getEventCount(final Predicate predicate) { + final AtomicInteger pendingQueueEventCount = new AtomicInteger(0); + pendingQueue.forEach( + event -> { + if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) { + pendingQueueEventCount.incrementAndGet(); + } + }); + return pendingQueueEventCount.get(); } public String getTaskID() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java index 7bcfdde8704b..52137e66b99a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java @@ -198,7 +198,8 @@ public void deregister(final String pipeID) { } } - public void markRegionCommit(final String pipeID, final boolean isDataRegion) { + public void markRegionCommit( + final String pipeID, final boolean isDataRegion, final boolean isDataRegionRealtimeEvent) { if (Objects.isNull(metricService)) { return; } @@ -212,7 +213,7 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) { } if (isDataRegion) { - operator.markDataRegionCommit(); + operator.markDataRegionCommit(isDataRegionRealtimeEvent); } else { operator.markSchemaRegionCommit(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index 67c17cead99e..d5f06bd75959 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -55,43 +55,60 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set schemaRegionExtractors = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final AtomicReference dataRegionCommitMeter = new AtomicReference<>(null); + + private final AtomicReference dataRegionHistoricalEventCommitMeter = + new AtomicReference<>(null); + private final AtomicReference dataRegionRealtimeEventCommitMeter = + new AtomicReference<>(null); private final AtomicReference schemaRegionCommitMeter = new AtomicReference<>(null); - private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE; + private double lastDataRegionHistoricalEventCommitSmoothingValue = Long.MAX_VALUE; + private double lastDataRegionRealtimeEventCommitSmoothingValue = Long.MAX_VALUE; private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE; //////////////////////////// Remaining event & time calculation //////////////////////////// - private static Predicate pipeNameFilter(final String pipeName) { + private static final Predicate NEED_TO_COMMIT_RATE = + EnrichedEvent::needToCommitRate; + private static final Predicate IS_DATA_REGION_REALTIME_EVENT = + EnrichedEvent::isDataRegionRealtimeEvent; + private static final Predicate IS_DATA_REGION_HISTORICAL_EVENT = + event -> !event.isDataRegionRealtimeEvent(); + + @SafeVarargs + private static Predicate filter( + final String pipeName, final Predicate... predicates) { return event -> { if (Objects.isNull(event)) { return false; } - return Objects.equals(event.getPipeName(), pipeName); - }; - } - - private static Predicate pipeNameAndCommitRateFilter(final String pipeName) { - return event -> { - if (Objects.isNull(event)) { + if (!Objects.equals(event.getPipeName(), pipeName)) { return false; } - return Objects.equals(event.getPipeName(), pipeName) && event.needToCommitRate(); + for (final Predicate predicate : predicates) { + if (!predicate.test(event)) { + return false; + } + } + return true; }; } long getRemainingEvents() { return dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getEventCount) + .map(IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount) + .reduce(Integer::sum) + .orElse(0) + + dataRegionExtractors.stream() + .map(extractor -> extractor.getRealtimeEventCount(filter(pipeName))) .reduce(Integer::sum) .orElse(0) + dataRegionProcessors.stream() - .map(processorSubtask -> processorSubtask.getEventCount(pipeNameFilter(pipeName))) + .map(processor -> processor.getEventCount(filter(pipeName))) .reduce(Integer::sum) .orElse(0) + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeNameFilter(pipeName))) + .map(connector -> connector.getEventCount(filter(pipeName))) .reduce(Integer::sum) .orElse(0) + schemaRegionExtractors.stream() @@ -111,42 +128,91 @@ long getRemainingEvents() { final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime = PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime(); - final double totalDataRegionWriteEventCount = + // data region historical event + final double totalDataRegionWriteHistoricalEventCount = dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getEventCount) + .map(IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount) .reduce(Integer::sum) .orElse(0) + dataRegionProcessors.stream() .map( - processorSubtask -> - processorSubtask.getEventCount(pipeNameAndCommitRateFilter(pipeName))) + processor -> + processor.getEventCount( + filter(pipeName, NEED_TO_COMMIT_RATE, IS_DATA_REGION_HISTORICAL_EVENT))) .reduce(Integer::sum) .orElse(0) + dataRegionConnectors.stream() .map( - connectorSubtask -> - connectorSubtask.getEventCount(pipeNameAndCommitRateFilter(pipeName))) + connector -> + connector.getEventCount( + filter(pipeName, NEED_TO_COMMIT_RATE, IS_DATA_REGION_HISTORICAL_EVENT))) .reduce(Integer::sum) .orElse(0); - dataRegionCommitMeter.updateAndGet( + dataRegionHistoricalEventCommitMeter.updateAndGet( meter -> { if (Objects.nonNull(meter)) { - lastDataRegionCommitSmoothingValue = + lastDataRegionHistoricalEventCommitSmoothingValue = pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); } return meter; }); - final double dataRegionRemainingTime; - if (totalDataRegionWriteEventCount <= 0) { - dataRegionRemainingTime = 0; + + final double dataRegionHistoricalEventRemainingTime; + if (totalDataRegionWriteHistoricalEventCount <= 0) { + dataRegionHistoricalEventRemainingTime = 0; } else { - dataRegionRemainingTime = - lastDataRegionCommitSmoothingValue <= 0 + dataRegionHistoricalEventRemainingTime = + lastDataRegionHistoricalEventCommitSmoothingValue <= 0 ? Double.MAX_VALUE - : totalDataRegionWriteEventCount / lastDataRegionCommitSmoothingValue; + : totalDataRegionWriteHistoricalEventCount + / lastDataRegionHistoricalEventCommitSmoothingValue; + } + + // data region realtime event + final double totalDataRegionWriteRealtimeEventCount = + dataRegionExtractors.stream() + .map( + extractor -> + extractor.getRealtimeEventCount(filter(pipeName, NEED_TO_COMMIT_RATE))) + .reduce(Integer::sum) + .orElse(0) + + dataRegionProcessors.stream() + .map( + processor -> + processor.getEventCount( + filter(pipeName, NEED_TO_COMMIT_RATE, IS_DATA_REGION_REALTIME_EVENT))) + .reduce(Integer::sum) + .orElse(0) + + dataRegionConnectors.stream() + .map( + connector -> + connector.getEventCount( + filter(pipeName, NEED_TO_COMMIT_RATE, IS_DATA_REGION_REALTIME_EVENT))) + .reduce(Integer::sum) + .orElse(0); + + dataRegionRealtimeEventCommitMeter.updateAndGet( + meter -> { + if (Objects.nonNull(meter)) { + lastDataRegionRealtimeEventCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); + } + return meter; + }); + + final double dataRegionRealtimeEventRemainingTime; + if (totalDataRegionWriteRealtimeEventCount <= 0) { + dataRegionRealtimeEventRemainingTime = 0; + } else { + dataRegionRealtimeEventRemainingTime = + lastDataRegionRealtimeEventCommitSmoothingValue <= 0 + ? 0 // NOTE HERE + : totalDataRegionWriteRealtimeEventCount + / lastDataRegionRealtimeEventCommitSmoothingValue; } + // schema region event final long totalSchemaRegionWriteEventCount = schemaRegionExtractors.stream() .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount) @@ -161,6 +227,7 @@ long getRemainingEvents() { } return meter; }); + final double schemaRegionRemainingTime; if (totalSchemaRegionWriteEventCount <= 0) { schemaRegionRemainingTime = 0; @@ -171,13 +238,19 @@ long getRemainingEvents() { : totalSchemaRegionWriteEventCount / lastSchemaRegionCommitSmoothingValue; } - if (totalDataRegionWriteEventCount + totalSchemaRegionWriteEventCount == 0) { + if (totalDataRegionWriteHistoricalEventCount + + totalDataRegionWriteRealtimeEventCount + + totalSchemaRegionWriteEventCount + == 0) { notifyEmpty(); } else { notifyNonEmpty(); } - final double result = Math.max(dataRegionRemainingTime, schemaRegionRemainingTime); + final double result = + Math.max( + dataRegionHistoricalEventRemainingTime + dataRegionRealtimeEventRemainingTime, + schemaRegionRemainingTime); return result >= REMAINING_MAX_SECONDS ? REMAINING_MAX_SECONDS : result; } @@ -206,14 +279,24 @@ void register(final IoTDBSchemaRegionExtractor extractor) { //////////////////////////// Rate //////////////////////////// - void markDataRegionCommit() { - dataRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + void markDataRegionCommit(final boolean isDataRegionRealtimeEvent) { + if (isDataRegionRealtimeEvent) { + dataRegionRealtimeEventCommitMeter.updateAndGet( + meter -> { + if (Objects.nonNull(meter)) { + meter.mark(); + } + return meter; + }); + } else { + dataRegionHistoricalEventCommitMeter.updateAndGet( + meter -> { + if (Objects.nonNull(meter)) { + meter.mark(); + } + return meter; + }); + } } void markSchemaRegionCommit() { @@ -236,7 +319,9 @@ public synchronized void thawRate(final boolean isStartPipe) { if (isStopped) { return; } - dataRegionCommitMeter.compareAndSet( + dataRegionHistoricalEventCommitMeter.compareAndSet( + null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock())); + dataRegionRealtimeEventCommitMeter.compareAndSet( null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock())); schemaRegionCommitMeter.compareAndSet( null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock())); @@ -246,7 +331,8 @@ public synchronized void thawRate(final boolean isStartPipe) { @Override public synchronized void freezeRate(final boolean isStopPipe) { super.freezeRate(isStopPipe); - dataRegionCommitMeter.set(null); + dataRegionHistoricalEventCommitMeter.set(null); + dataRegionRealtimeEventCommitMeter.set(null); schemaRegionCommitMeter.set(null); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 5ca05844296b..3ef6cc5fc7b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -542,6 +542,7 @@ public long getCurrentCommitId() { } public int getPipeEventCount(final Predicate predicate) { + // 1. events in inputPendingQueue final AtomicInteger inputPendingQueuePipeEventCount = new AtomicInteger(0); inputPendingQueue.forEach( event -> { @@ -549,11 +550,13 @@ public int getPipeEventCount(final Predicate predicate) { inputPendingQueuePipeEventCount.incrementAndGet(); } }); + // 2. events in prefetchingQueue final int prefetchingQueuePipeEventCount = prefetchingQueue.stream() .map(event -> event.getPipeEventCount(predicate)) .reduce(Integer::sum) .orElse(0); + // do not consider events on the fly return inputPendingQueuePipeEventCount.get() + prefetchingQueuePipeEventCount; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitRateMarker.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitRateMarker.java new file mode 100644 index 000000000000..bf27a74f2cf4 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitRateMarker.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.task.progress; + +@FunctionalInterface +public interface CommitRateMarker { + void accept( + final String pipeID, final boolean isDataRegion, final boolean isDataRegionRealtimeEvent); +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index cb8e5b063572..caa1030dbb44 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiConsumer; public class PipeEventCommitManager { @@ -40,7 +39,7 @@ public class PipeEventCommitManager { private final Map eventCommitterRestartTimesMap = new ConcurrentHashMap<>(); - private BiConsumer commitRateMarker; + private CommitRateMarker commitRateMarker; public void register( final String pipeName, @@ -105,7 +104,9 @@ public void commit(final EnrichedEvent event, final CommitterKey committerKey) { if (Objects.nonNull(commitRateMarker) && event.needToCommitRate()) { try { commitRateMarker.accept( - event.getPipeName() + '_' + event.getCreationTime(), event.isDataRegionEvent()); + event.getPipeName() + '_' + event.getCreationTime(), + event.isDataRegionEvent(), + event.isDataRegionRealtimeEvent()); } catch (final Exception e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -163,7 +164,7 @@ private static CommitterKey generateCommitterRestartTimesKey(final CommitterKey committerKey.getPipeName(), committerKey.getCreationTime(), committerKey.getRegionId()); } - public void setCommitRateMarker(final BiConsumer commitRateMarker) { + public void setCommitRateMarker(final CommitRateMarker commitRateMarker) { this.commitRateMarker = commitRateMarker; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 403734c2f83c..deb56635a1c5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -301,6 +301,10 @@ public final boolean isDataRegionEvent() { return !(this instanceof PipeWritePlanEvent) && !(this instanceof PipeSnapshotEvent); } + public boolean isDataRegionRealtimeEvent() { + return true; + } + /** * Get the pattern string of this {@link EnrichedEvent}. *