Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: refact pipe remaining metrics calculation #13640

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public void deregister(final String pipeID) {
}
}

public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
public void markRegionCommit(
final String pipeID, final boolean isDataRegion, final boolean isDataRegionRealtimeEvent) {
if (Objects.isNull(metricService)) {
return;
}
Expand Down
2 changes: 2 additions & 0 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@
<!-- These are used at runtime in tests -->
<usedDependency>io.jsonwebtoken:jjwt-impl</usedDependency>
<usedDependency>io.jsonwebtoken:jjwt-jackson</usedDependency>
<!-- We need this dependency as it provides the metric managers used in tests -->
<usedDependency>org.apache.iotdb:metrics-core</usedDependency>
<!-- This dependency is required at runtime, when enabling the rest service -->
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
</usedDependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {

Expand Down Expand Up @@ -218,7 +219,7 @@ public void close() {
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
* its queued events in the output pipe connector.
*/
public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);

Expand Down Expand Up @@ -297,33 +298,25 @@ public int getAsyncConnectorRetryEventQueueSize() {

// For performance, this will not acquire lock and does not guarantee the correct
// result. However, this shall not cause any exceptions when concurrently read & written.
public int getEventCount(final String pipeName) {
final AtomicInteger count = new AtomicInteger(0);
try {
inputPendingQueue.forEach(
event -> {
if (event instanceof EnrichedEvent
&& pipeName.equals(((EnrichedEvent) event).getPipeName())) {
count.incrementAndGet();
}
});
} catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Exception occurred when counting event of pipe {}, root cause: {}",
pipeName,
ErrorHandlingUtils.getRootCause(e).getMessage(),
e);
}
}
// Avoid potential NPE in "getPipeName"
public int getEventCount(final Predicate<EnrichedEvent> predicate) {
// 1. events in inputPendingQueue
final AtomicInteger inputPendingQueuePipeEventCount = new AtomicInteger(0);
inputPendingQueue.forEach(
event -> {
if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) {
inputPendingQueuePipeEventCount.incrementAndGet();
}
});
// 2. events in specific connector
final int retryEventQueuePipeEventCount =
outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(predicate)
: 0;
// 3. lastEvent: avoid potential NPE in "getPipeName"
final EnrichedEvent event =
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
return count.get()
+ (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(pipeName)
: 0)
+ (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1 : 0);
final int lastEventCount = (Objects.nonNull(event) && predicate.test(event)) ? 1 : 0;
return inputPendingQueuePipeEventCount.get() + retryEventQueuePipeEventCount + lastEventCount;
}

//////////////////////////// Error report ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

public class PipeProcessorSubtask extends PipeReportableSubtask {

Expand All @@ -59,7 +60,6 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {

// Record these variables to provide corresponding value to tag key of monitoring metrics
private final String pipeName;
private final String pipeNameWithCreationTime; // cache for better performance
private final int regionId;

private final EventSupplier inputEventSupplier;
Expand All @@ -80,7 +80,6 @@ public PipeProcessorSubtask(
final PipeEventCollector outputEventCollector) {
super(taskID, creationTime);
this.pipeName = pipeName;
this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
this.regionId = regionId;
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
Expand Down Expand Up @@ -138,15 +137,9 @@ protected boolean executeOnce() throws Exception {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();
Expand Down Expand Up @@ -280,13 +273,12 @@ public int getRegionId() {
return regionId;
}

public int getEventCount(final boolean ignoreHeartbeat) {
// Avoid potential NPE in "getPipeName"
public int getEventCount(final Predicate<EnrichedEvent> predicate) {
// lastEvent: avoid potential NPE in "getPipeName"
final EnrichedEvent event =
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof PipeHeartbeatEvent)
? 1
: 0;
// TODO: consider events in specific processor
return (Objects.nonNull(event) && predicate.test(event)) ? 1 : 0;
}

//////////////////////////// Error report ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
Expand Down Expand Up @@ -570,22 +571,14 @@ public int getRetryEventQueueSize() {

// For performance, this will not acquire lock and does not guarantee the correct
// result. However, this shall not cause any exceptions when concurrently read & written.
public int getRetryEventCount(final String pipeName) {
public int getRetryEventCount(final Predicate<EnrichedEvent> predicate) {
final AtomicInteger count = new AtomicInteger(0);
try {
retryEventQueue.forEach(
event -> {
if (event instanceof EnrichedEvent
&& pipeName.equals(((EnrichedEvent) event).getPipeName())) {
count.incrementAndGet();
}
});
return count.get();
} catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to get retry event count for pipe {}.", pipeName, e);
}
return count.get();
}
retryEventQueue.forEach(
event -> {
if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) {
count.incrementAndGet();
}
});
return count.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public ProgressIndex getProgressIndex() {
return sourceEvent != null ? sourceEvent.getProgressIndex() : MinimumProgressIndex.INSTANCE;
}

@Override
public boolean isDataRegionRealtimeEvent() {
if (!(sourceEvent instanceof PipeTsFileInsertionEvent)) {
return false;
}
return sourceEvent.isDataRegionRealtimeEvent();
}

@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
Expand Down Expand Up @@ -246,6 +254,13 @@ public boolean isGeneratedByPipe() {
throw new UnsupportedOperationException("isGeneratedByPipe() is not supported!");
}

@Override
public boolean needToCommitRate() {
// When computing the commit rate, only consider events where needToReport is true to avoid
// counting unparsed source events that influence remaining time calculation.
return super.needToCommitRate() && needToReport;
}

@Override
public boolean mayEventTimeOverlappedWithTimeRange() {
final long[] timestamps = tablet.timestamps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ public void eliminateProgressIndex() {
}
}

@Override
public boolean isDataRegionRealtimeEvent() {
return !isGeneratedByHistoricalExtractor;
}

@Override
public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
Expand Down Expand Up @@ -562,9 +564,7 @@ public int getPipeHeartbeatEventCount() {
return hasBeenStarted.get() ? realtimeExtractor.getPipeHeartbeatEventCount() : 0;
}

public int getEventCount() {
return hasBeenStarted.get()
? (historicalExtractor.getPendingQueueSize() + realtimeExtractor.getEventCount())
: 0;
public int getRealtimeEventCount(final Predicate<EnrichedEvent> predicate) {
return hasBeenStarted.get() ? realtimeExtractor.getEventCount(predicate) : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -543,8 +545,15 @@ public int getPipeHeartbeatEventCount() {
return pendingQueue.getPipeHeartbeatEventCount();
}

public int getEventCount() {
return pendingQueue.size();
public int getEventCount(final Predicate<EnrichedEvent> predicate) {
final AtomicInteger pendingQueueEventCount = new AtomicInteger(0);
pendingQueue.forEach(
event -> {
if (event instanceof EnrichedEvent && predicate.test((EnrichedEvent) event)) {
pendingQueueEventCount.incrementAndGet();
}
});
return pendingQueueEventCount.get();
}

public String getTaskID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public void deregister(final String pipeID) {
}
}

public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
public void markRegionCommit(
final String pipeID, final boolean isDataRegion, final boolean isDataRegionRealtimeEvent) {
if (Objects.isNull(metricService)) {
return;
}
Expand All @@ -212,25 +213,12 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
}

if (isDataRegion) {
operator.markDataRegionCommit();
operator.markDataRegionCommit(isDataRegionRealtimeEvent);
} else {
operator.markSchemaRegionCommit();
}
}

public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) {
if (Objects.isNull(metricService)) {
return;
}
final PipeDataNodeRemainingEventAndTimeOperator operator =
remainingEventAndTimeOperatorMap.get(pipeID);
if (Objects.isNull(operator)) {
return;
}

operator.markCollectInvocationCount(collectInvocationCount);
}

//////////////////////////// Show pipes ////////////////////////////

public Pair<Long, Double> getRemainingEventAndTime(
Expand Down
Loading
Loading