Skip to content

Commit

Permalink
[Flink] Fix rolling file and row group size configs (lakesoul-io#519)
Browse files Browse the repository at this point in the history
* fix totalRows in native arrow writer

Signed-off-by: chenxu <[email protected]>

* add max row group option for flink writer

Signed-off-by: chenxu <[email protected]>

* reduce rolling rows to 1 million

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jul 30, 2024
1 parent 72306ce commit 7afe8af
Show file tree
Hide file tree
Showing 15 changed files with 251 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_CHECK_INTERVAL;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_PARALLELISM;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKETING;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FILE_ROLLING_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FILE_ROLLING_TIME;

public class LakeSoulMultiTableSinkStreamBuilder {

Expand Down Expand Up @@ -56,33 +60,37 @@ public DataStream<BinarySourceRecord> buildHashPartitionedCDCStream(DataStream<B

public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<BinarySourceRecord> stream) {
context.conf.set(DYNAMIC_BUCKETING, false);
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl<RowData>(
LakeSoulRollingPolicyImpl<RowData> rollingPolicy = new LakeSoulRollingPolicyImpl<>(
context.conf.getLong(FILE_ROLLING_SIZE), context.conf.getLong(FILE_ROLLING_TIME));
OutputFileConfig fileNameConfig = OutputFileConfig.builder()
.withPartSuffix(".parquet")
.build();
LakeSoulMultiTablesSink<BinarySourceRecord, RowData> sink = LakeSoulMultiTablesSink.forMultiTablesBulkFormat(context.conf)
.withBucketCheckInterval(context.conf.getLong(BUCKET_CHECK_INTERVAL))
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
LakeSoulMultiTablesSink<BinarySourceRecord, RowData>
sink =
LakeSoulMultiTablesSink.forMultiTablesBulkFormat(context.conf)
.withBucketCheckInterval(context.conf.getLong(BUCKET_CHECK_INTERVAL))
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
}

public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context, DataStream<LakeSoulArrowWrapper> stream) {
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl<LakeSoulArrowWrapper>(
public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context,
DataStream<LakeSoulArrowWrapper> stream) {
LakeSoulRollingPolicyImpl<LakeSoulArrowWrapper> rollingPolicy = new LakeSoulRollingPolicyImpl<>(
context.conf.getLong(FILE_ROLLING_SIZE), context.conf.getLong(FILE_ROLLING_TIME));
OutputFileConfig fileNameConfig = OutputFileConfig.builder()
.withPartSuffix(".parquet")
.build();
LakeSoulMultiTablesSink<LakeSoulArrowWrapper, LakeSoulArrowWrapper> sink = LakeSoulMultiTablesSink.forMultiTablesArrowFormat(context.conf)
.withBucketCheckInterval(context.conf.getLong(BUCKET_CHECK_INTERVAL))
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable Arrow Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
LakeSoulMultiTablesSink<LakeSoulArrowWrapper, LakeSoulArrowWrapper>
sink =
LakeSoulMultiTablesSink.forMultiTablesArrowFormat(context.conf)
.withBucketCheckInterval(context.conf.getLong(BUCKET_CHECK_INTERVAL))
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable Arrow Sink");
}

public DataStreamSink<BinarySourceRecord> printStream(DataStream<BinarySourceRecord> stream, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@

import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.data.RowData;

import java.io.IOException;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DEFAULT_BUCKET_ROLLING_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DEFAULT_BUCKET_ROLLING_TIME;

public class LakeSoulRollingPolicyImpl<In> extends CheckpointRollingPolicy<In, String> {

private boolean rollOnCheckpoint;
Expand All @@ -27,12 +23,6 @@ public LakeSoulRollingPolicyImpl(long rollingSize, long rollingTime) {
this.rollingTime = rollingTime;
}

public LakeSoulRollingPolicyImpl(boolean rollOnCheckpoint) {
this.rollingSize = DEFAULT_BUCKET_ROLLING_SIZE;
this.rollingTime = DEFAULT_BUCKET_ROLLING_TIME;
this.rollOnCheckpoint = rollOnCheckpoint;
}

@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
return this.rollOnCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
import org.apache.flink.table.runtime.arrow.ArrowUtils;
import org.apache.flink.table.runtime.arrow.ArrowWriter;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.MAX_ROW_GROUP_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SORT_FIELD;

public class DynamicPartitionNativeParquetWriter implements InProgressFileWriter<RowData, String> {

private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionNativeParquetWriter.class);

private final RowType rowType;

private ArrowWriter<RowData> arrowWriter;
Expand All @@ -37,7 +43,7 @@ public class DynamicPartitionNativeParquetWriter implements InProgressFileWriter

private NativeIOWriter nativeWriter;

private final int batchSize;
private final int maxRowGroupRows;

private final long creationTime;

Expand All @@ -57,7 +63,7 @@ public DynamicPartitionNativeParquetWriter(RowType rowType,
Path path,
long creationTime,
Configuration conf) throws IOException {
this.batchSize = 250000; // keep same with native writer's row group row number
this.maxRowGroupRows = conf.getInteger(MAX_ROW_GROUP_SIZE);
this.creationTime = creationTime;
this.rowsInBatch = 0;
this.rowType = rowType;
Expand All @@ -79,7 +85,7 @@ private void initNativeWriter() throws IOException {
}
nativeWriter.setHashBucketNum(conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM));

nativeWriter.setRowGroupRowNumber(this.batchSize);
nativeWriter.setRowGroupRowNumber(this.maxRowGroupRows);
batch = VectorSchemaRoot.create(arrowSchema, nativeWriter.getAllocator());
arrowWriter = ArrowUtils.createRowDataArrowWriter(batch, rowType);

Expand All @@ -89,6 +95,7 @@ private void initNativeWriter() throws IOException {

FlinkUtil.setFSConfigs(conf, nativeWriter);
nativeWriter.initializeWriter();
LOG.info("Initialized DynamicPartitionNativeParquetWriter: {}", this);
}

@Override
Expand All @@ -97,7 +104,7 @@ public void write(RowData element, long currentTime) throws IOException {
this.arrowWriter.write(element);
this.rowsInBatch++;
this.totalRows++;
if (this.rowsInBatch >= this.batchSize) {
if (this.rowsInBatch >= this.maxRowGroupRows) {
this.arrowWriter.finish();
this.nativeWriter.write(this.batch);
// in native writer, batch may be kept in memory for sorting,
Expand Down Expand Up @@ -139,13 +146,13 @@ public Map<String, List<PendingFileRecoverable>> closeForCommitWithRecoverableMa
if (this.batch.getRowCount() > 0) {
this.nativeWriter.write(this.batch);
HashMap<String, List<String>> partitionDescAndFilesMap = this.nativeWriter.flush();
// System.out.println(partitionDescAndFilesMap);
for (Map.Entry<String, List<String>> entry : partitionDescAndFilesMap.entrySet()) {
recoverableMap.put(
entry.getKey(),
entry.getValue()
.stream()
.map(path -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(path, creationTime))
.map(path -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(path,
creationTime))
.collect(Collectors.toList())
);
}
Expand All @@ -160,6 +167,7 @@ public Map<String, List<PendingFileRecoverable>> closeForCommitWithRecoverableMa
throw new RuntimeException(e);
}
}
LOG.info("CloseForCommitWithRecoverableMap done, recoverableMap={}", recoverableMap);
return recoverableMap;
}

Expand Down Expand Up @@ -195,4 +203,18 @@ public long getSize() throws IOException {
public long getLastUpdateTime() {
return this.lastUpdateTime;
}

@Override public String toString() {
return "DynamicPartitionNativeParquetWriter{" +
"rowType=" + rowType +
", primaryKeys=" + primaryKeys +
", rangeColumns=" + rangeColumns +
", maxRowGroupRows=" + maxRowGroupRows +
", creationTime=" + creationTime +
", rowsInBatch=" + rowsInBatch +
", lastUpdateTime=" + lastUpdateTime +
", prefix='" + prefix + '\'' +
", totalRows=" + totalRows +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,26 @@
import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.flink.streaming.api.functions.sink.filesystem.*;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A bucket is the directory organization of the output of the {@link LakeSoulMultiTablesSink}.
Expand Down Expand Up @@ -55,9 +63,6 @@ public class LakeSoulWriterBucket {

private long tsMs;

// private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles =
// new ArrayList<>();

private final Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesMap =
new HashMap<>();

Expand Down Expand Up @@ -118,10 +123,10 @@ private LakeSoulWriterBucket(
}

private void restoreState(LakeSoulWriterBucketState state) throws IOException {
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : state.getPendingFileRecoverableMap().entrySet()) {
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : state.getPendingFileRecoverableMap()
.entrySet()) {
pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue());
}
// pendingFiles.addAll(state.getPendingFileRecoverableList());
}

public String getBucketId() {
Expand All @@ -143,10 +148,6 @@ public boolean isActive() {
void merge(final LakeSoulWriterBucket bucket) throws IOException {
checkNotNull(bucket);

// checkState(Objects.equals(bucket.bucketPath, bucketPath));
// pendingFiles.addAll(bucket.pendingFiles);


bucket.closePartFile();
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : bucket.pendingFilesMap.entrySet()) {
pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue());
Expand All @@ -168,7 +169,8 @@ void write(RowData element, long currentTime, long tsMs) throws IOException {
inProgressPartWriter.write(element, currentTime);
}

List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlType, String sourcePartitionInfo) throws IOException {
List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlType, String sourcePartitionInfo)
throws IOException {
// we always close part file and do not keep in-progress file
// since the native parquet writer doesn't support resume
if (inProgressPartWriter != null) {
Expand All @@ -179,7 +181,8 @@ List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlT

List<LakeSoulMultiTableSinkCommittable> committables = new ArrayList<>();
long time = pendingFilesMap.isEmpty() ? Long.MIN_VALUE :
((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst().get().get(0)).creationTime;
((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst()
.get().get(0)).creationTime;

if (dmlType.equals(LakeSoulSinkOptions.DELETE)) {
List<PartitionInfo> sourcePartitionInfoList = JniWrapper
Expand All @@ -192,7 +195,6 @@ List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlT
}
}
committables.add(new LakeSoulMultiTableSinkCommittable(
// getBucketId(),
tableId,
new HashMap<>(pendingFilesMap),
time,
Expand All @@ -211,13 +213,6 @@ LakeSoulWriterBucketState snapshotState() throws IOException {
closePartFile();
}

// this.pendingFiles would be cleared later, we need to make a copy
// List<InProgressFileWriter.PendingFileRecoverable> tmpPending = new ArrayList<>(pendingFiles);
// return new LakeSoulWriterBucketState(
// tableId,
// getBucketId(),
// bucketPath,
// tmpPending);
return new LakeSoulWriterBucketState(tableId, bucketPath, new HashMap<>(pendingFilesMap));
}

Expand Down Expand Up @@ -289,12 +284,12 @@ private void closePartFile() throws IOException {
Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap =
((DynamicPartitionNativeParquetWriter) inProgressPartWriter).closeForCommitWithRecoverableMap();
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : pendingFileRecoverableMap.entrySet()) {
pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList()).addAll(entry.getValue());
pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList())
.addAll(entry.getValue());
}
} else {
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable =
inProgressPartWriter.closeForCommit();
// pendingFiles.add(pendingFileRecoverable);
pendingFilesMap.computeIfAbsent(bucketId, bucketId -> new ArrayList()).add(pendingFileRecoverable);
inProgressPartWriter = null;
LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(),
Expand Down Expand Up @@ -352,6 +347,7 @@ static LakeSoulWriterBucket restore(
final RollingPolicy<RowData, String> rollingPolicy,
final LakeSoulWriterBucketState bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return new LakeSoulWriterBucket(subTaskId, tableId, conf, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
return new LakeSoulWriterBucket(subTaskId, tableId, conf, bucketWriter, rollingPolicy, bucketState,
outputFileConfig);
}
}
Loading

0 comments on commit 7afe8af

Please sign in to comment.