Skip to content

Commit

Permalink
Flink: RANGE distribution for IcebergSink (new sinkV2 interface)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodmeneses committed Jan 23, 2025
1 parent ce2af52 commit c44ba98
Show file tree
Hide file tree
Showing 3 changed files with 583 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,19 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -375,22 +381,43 @@ public Builder flinkConf(ReadableConfig config) {

/**
* Configure the write {@link DistributionMode} that the IcebergSink will use. Currently, flink
* support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
* support {@link DistributionMode#NONE} and {@link DistributionMode#HASH} and {@link
* DistributionMode#RANGE}
*
* @param mode to specify the write distribution mode.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
"Flink does not support 'range' write distribution mode now.");
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
}
return this;
}

/**
* Range distribution needs to collect statistics about data distribution to properly shuffle
* the records in relatively balanced way. In general, low cardinality should use {@link
* StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to
* {@link StatisticsType} Javadoc for more details.
*
* <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if
* cardinality is higher than the threshold (currently 10K) as defined in {@code
* SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to
* the sketch reservoir sampling.
*
* <p>Explicit set the statistics type if the default behavior doesn't work.
*
* @param type to specify the statistics type for range distribution.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
public IcebergSink.Builder rangeDistributionStatisticsType(StatisticsType type) {
if (type != null) {
writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
}
return this;
}

/**
* Configuring the write parallel number for iceberg stream writer.
*
Expand Down Expand Up @@ -559,6 +586,10 @@ public DataStreamSink<RowData> append() {
}
}

private String operatorName(String suffix) {
return uidSuffix != null ? suffix + "-" + uidSuffix : suffix;
}

private static String defaultSuffix(String uidSuffix, String defaultSuffix) {
if (uidSuffix == null || uidSuffix.isEmpty()) {
return defaultSuffix;
Expand Down Expand Up @@ -645,6 +676,12 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input) {
DistributionMode mode = flinkWriteConf.distributionMode();
Schema schema = table.schema();
PartitionSpec spec = table.spec();
SortOrder sortOrder = table.sortOrder();

int writerParallelism =
flinkWriteConf.writeParallelism() == null
? input.getParallelism()
: flinkWriteConf.writeParallelism();
LOG.info("Write distribution mode is '{}'", mode.modeName());
switch (mode) {
case NONE:
Expand Down Expand Up @@ -692,20 +729,51 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input) {
}

case RANGE:
if (equalityFieldIds.isEmpty()) {
// Ideally, exception should be thrown in the combination of range distribution and
// equality fields. Primary key case should use hash distribution mode.
// Keep the current behavior of falling back to keyBy for backward compatibility.
if (!equalityFieldIds.isEmpty()) {
LOG.warn(
"Fallback to use 'none' distribution mode, because there are no equality fields set "
+ "and {}=range is not supported yet in flink",
WRITE_DISTRIBUTION_MODE);
return input;
} else {
LOG.info(
"Distribute rows by equality fields, because there are equality fields set "
+ "and{}=range is not supported yet in flink",
"Hash distribute rows by equality fields, even though {}=range is set. "
+ "Range distribution for primary keys are not always safe in "
+ "Flink streaming writer.",
WRITE_DISTRIBUTION_MODE);
return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds));
}

// range distribute by partition key or sort key if table has an SortOrder
Preconditions.checkState(
sortOrder.isSorted() || spec.isPartitioned(),
"Invalid write distribution mode: range. Need to define sort order or partition spec.");
if (sortOrder.isUnsorted()) {
sortOrder = Partitioning.sortOrderFor(spec);
LOG.info("Construct sort order from partition spec");
}

LOG.info("Range distribute rows by sort order: {}", sortOrder);
StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
input
.transform(
operatorName("range-shuffle"),
TypeInformation.of(StatisticsOrRecord.class),
new DataStatisticsOperatorFactory(
schema,
sortOrder,
writerParallelism,
statisticsType,
flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
// Set the parallelism same as input operator to encourage chaining
.setParallelism(input.getParallelism());
if (uidSuffix != null) {
shuffleStream = shuffleStream.uid("shuffle-" + uidSuffix);
}

return shuffleStream
.partitionCustom(new RangePartitioner(schema, sortOrder), r -> r)
.filter(StatisticsOrRecord::hasRecord)
.map(StatisticsOrRecord::record);

default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + mode);
}
Expand Down
Loading

0 comments on commit c44ba98

Please sign in to comment.