Skip to content

Commit

Permalink
[FLINK-33316][runtime] Avoid unnecessary heavy getStreamOperatorFactory
Browse files Browse the repository at this point in the history
This closes #23550.
  • Loading branch information
1996fanrui authored Oct 19, 2023
1 parent c2e14ff commit a2681f6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ public class StreamConfig implements Serializable {
// Config Keys
// ------------------------------------------------------------------------

@VisibleForTesting public static final String SERIALIZEDUDF = "serializedUDF";
public static final String SERIALIZED_UDF = "serializedUDF";
/**
* Introduce serializedUdfClassName to avoid unnecessarily heavy {@link
* #getStreamOperatorFactory}.
*/
public static final String SERIALIZED_UDF_CLASS_NAME = "serializedUdfClassName";

private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
private static final String NUMBER_OF_NETWORK_INPUTS = "numberOfNetworkInputs";
Expand Down Expand Up @@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator<?> operator) {

public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
if (factory != null) {
toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
config.setString(SERIALIZED_UDF_CLASS_NAME, factory.getClass().getName());
}
}

Expand All @@ -380,7 +386,7 @@ public <T extends StreamOperator<?>> T getStreamOperator(ClassLoader cl) {

public <T extends StreamOperatorFactory<?>> T getStreamOperatorFactory(ClassLoader cl) {
try {
return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZED_UDF, cl);
} catch (ClassNotFoundException e) {
String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
boolean loadableDoubleCheck = ClassLoaderUtil.validateClassLoadable(e, cl);
Expand All @@ -400,6 +406,10 @@ public <T extends StreamOperatorFactory<?>> T getStreamOperatorFactory(ClassLoad
}
}

public String getStreamOperatorFactoryClassName() {
return config.getString(SERIALIZED_UDF_CLASS_NAME, null);
}

public void setIterationId(String iterationId) {
config.setString(ITERATION_ID, iterationId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,17 +641,15 @@ private Map<StreamConfig.SourceInputConfig, ChainedSource> createChainedSources(
@Nullable
private Counter getOperatorRecordsOutCounter(
StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
StreamOperatorFactory<?> operatorFactory =
operatorConfig.getStreamOperatorFactory(userCodeClassloader);
String streamOperatorFactoryClassName = operatorConfig.getStreamOperatorFactoryClassName();
// Do not use the numRecordsOut counter on output if this operator is SinkWriterOperator.
//
// Metric "numRecordsOut" is defined as the total number of records written to the
// external system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the
// number of records sent to downstream operators, which is number of Committable batches
// sent to SinkCommitter. So we skip registering this metric on output and leave this metric
// to sink writer implementations to report.
if (operatorFactory instanceof SinkWriterOperatorFactory) {
if (SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)) {
return null;
}

Expand Down

0 comments on commit a2681f6

Please sign in to comment.