Skip to content

Commit

Permalink
Pipe: omit region id when marking collect invocation count for remain…
Browse files Browse the repository at this point in the history
…ing time calculation (#13673)
  • Loading branch information
VGalaxies authored Oct 12, 2024
1 parent ec5650c commit 3570725
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public PipeTaskProcessorStage(
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
creationTime,
pipeName,
creationTime,
regionId,
pipeExtractorInputEventSupplier,
pipeProcessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,35 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
private static final AtomicReference<PipeProcessorSubtaskWorkerManager> subtaskWorkerManager =
new AtomicReference<>();

private final EventSupplier inputEventSupplier;
private final PipeProcessor pipeProcessor;
private final PipeEventCollector outputEventCollector;

// Record these variables to provide corresponding value to tag key of monitoring metrics
private final String pipeName;
private final String pipeNameWithCreationTime; // cache for better performance
private final int regionId;

private final EventSupplier inputEventSupplier;
private final PipeProcessor pipeProcessor;
private final PipeEventCollector outputEventCollector;

// This variable is used to distinguish between old and new subtasks before and after stuck
// restart.
private final long subtaskCreationTime;

public PipeProcessorSubtask(
final String taskID,
final long creationTime,
final String pipeName,
final long creationTime,
final int regionId,
final EventSupplier inputEventSupplier,
final PipeProcessor pipeProcessor,
final PipeEventCollector outputEventCollector) {
super(taskID, creationTime);
this.subtaskCreationTime = System.currentTimeMillis();
this.pipeName = pipeName;
this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
this.regionId = regionId;
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
this.outputEventCollector = outputEventCollector;
this.subtaskCreationTime = System.currentTimeMillis();

// Only register dataRegions
if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) {
Expand Down Expand Up @@ -137,12 +139,14 @@ protected boolean executeOnce() throws Exception {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount());
.markCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount());
.markCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public void setUp() throws Exception {
Mockito.spy(
new PipeProcessorSubtask(
"PipeProcessorSubtaskExecutorTest",
System.currentTimeMillis(),
"TestPipe",
System.currentTimeMillis(),
0,
mock(EventSupplier.class),
mock(PipeProcessor.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public void commit(final EnrichedEvent event, final CommitterKey committerKey) {
}
if (Objects.nonNull(commitRateMarker)) {
try {
commitRateMarker.accept(
event.getPipeName() + '_' + event.getCreationTime(), event.isDataRegionEvent());
commitRateMarker.accept(event.getPipeNameWithCreationTime(), event.isDataRegionEvent());
} catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public abstract class EnrichedEvent implements Event {

protected final String pipeName;
protected final long creationTime;
private final String pipeNameWithCreationTime; // cache for better performance

protected final PipeTaskMeta pipeTaskMeta;

protected CommitterKey committerKey;
Expand Down Expand Up @@ -86,6 +88,7 @@ protected EnrichedEvent(

this.pipeName = pipeName;
this.creationTime = creationTime;
this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
this.pipeTaskMeta = pipeTaskMeta;
this.treePattern = treePattern;
this.tablePattern = tablePattern;
Expand Down Expand Up @@ -302,6 +305,10 @@ public final long getCreationTime() {
return creationTime;
}

public String getPipeNameWithCreationTime() {
return pipeNameWithCreationTime;
}

public final int getRegionId() {
// TODO: persist regionId in EnrichedEvent
return committerKey == null ? -1 : committerKey.getRegionId();
Expand Down

0 comments on commit 3570725

Please sign in to comment.