Skip to content

Commit

Permalink
[flink] Replace legacy SourceFunction with v2 Source
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 21, 2024
1 parent a6f3aae commit c20e1a6
Show file tree
Hide file tree
Showing 26 changed files with 948 additions and 497 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -52,15 +51,8 @@ public MultiAwareBucketTableScan(
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
super(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
isStreaming,
isRunning);
boolean isStreaming) {
super(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming);
tablesMap = new HashMap<>();
scansMap = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.Split;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable;
Expand All @@ -57,22 +56,19 @@ public abstract class MultiTableScanBase<T> implements AutoCloseable {

protected transient Catalog catalog;

protected AtomicBoolean isRunning;
protected boolean isStreaming;

public MultiTableScanBase(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
boolean isStreaming) {
catalog = catalogLoader.load();

this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.databasePattern = databasePattern;
this.isRunning = isRunning;
this.isStreaming = isStreaming;
}

Expand Down Expand Up @@ -104,13 +100,9 @@ protected void updateTableMap()
}
}

public ScanResult scanTable(SourceFunction.SourceContext<T> ctx)
public ScanResult scanTable(ReaderOutput<T> ctx)
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
try {
if (!isRunning.get()) {
return ScanResult.FINISHED;
}

updateTableMap();
List<T> tasks = doScan();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

/**
Expand All @@ -46,15 +45,8 @@ public MultiUnawareBucketTableScan(
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
super(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
isStreaming,
isRunning);
boolean isStreaming) {
super(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming);
tablesMap = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private DataStreamSource<UnawareAppendCompactionTask> buildSource() {
new BucketUnawareCompactSource(
table, isContinuous, scanInterval, partitionPredicate);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
return BucketUnawareCompactSource.buildSource(env, source, tableIdentifier);
}

private void sinkFromSource(DataStreamSource<UnawareAppendCompactionTask> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -31,10 +34,14 @@
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -49,19 +56,13 @@
* <li>Assigning them to downstream tasks for further processing.
* </ol>
*/
public class QueryFileMonitor extends RichSourceFunction<InternalRow> {
public class QueryFileMonitor extends AbstractNonCoordinatedSource<InternalRow> {

private static final long serialVersionUID = 1L;

private final Table table;
private final long monitorInterval;

private transient SourceContext<InternalRow> ctx;
private transient StreamTableScan scan;
private transient TableRead read;

private volatile boolean isRunning = true;

public QueryFileMonitor(Table table) {
this.table = table;
this.monitorInterval =
Expand All @@ -71,55 +72,53 @@ public QueryFileMonitor(Table table) {
}

@Override
public void open(OpenContext openContext) throws Exception {
FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table);
ReadBuilder readBuilder = monitorTable.newReadBuilder();
this.scan = readBuilder.newStreamScan();
this.read = readBuilder.newRead();
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public void run(SourceContext<InternalRow> ctx) throws Exception {
this.ctx = ctx;
while (isRunning) {
boolean isEmpty;
synchronized (ctx.getCheckpointLock()) {
if (!isRunning) {
return;
}
isEmpty = doScan();
}
public SourceReader<InternalRow, SimpleSourceSplit> createReader(
SourceReaderContext sourceReaderContext) throws Exception {
return new Reader();
}

private class Reader extends AbstractNonCoordinatedSourceReader<InternalRow> {
private transient StreamTableScan scan;
private transient TableRead read;

@Override
public void start() {
FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table);
ReadBuilder readBuilder = monitorTable.newReadBuilder();
this.scan = readBuilder.newStreamScan();
this.read = readBuilder.newRead();
}

@Override
public InputStatus pollNext(ReaderOutput<InternalRow> readerOutput) throws Exception {
boolean isEmpty = doScan(readerOutput);

if (isEmpty) {
Thread.sleep(monitorInterval);
}
return InputStatus.MORE_AVAILABLE;
}
}

private boolean doScan() throws Exception {
List<InternalRow> records = new ArrayList<>();
read.createReader(scan.plan()).forEachRemaining(records::add);
records.forEach(ctx::collect);
return records.isEmpty();
}

@Override
public void cancel() {
// this is to cover the case where cancel() is called before the run()
if (ctx != null) {
synchronized (ctx.getCheckpointLock()) {
isRunning = false;
}
} else {
isRunning = false;
private boolean doScan(ReaderOutput<InternalRow> readerOutput) throws Exception {
List<InternalRow> records = new ArrayList<>();
read.createReader(scan.plan()).forEachRemaining(records::add);
records.forEach(readerOutput::collect);
return records.isEmpty();
}
}

public static DataStream<InternalRow> build(StreamExecutionEnvironment env, Table table) {
return env.addSource(
new QueryFileMonitor(table),
"FileMonitor-" + table.name(),
InternalTypeInfo.fromRowType(FileMonitorTable.getRowType()));
return env.fromSource(
new QueryFileMonitor(table),
WatermarkStrategy.noWatermarks(),
"FileMonitor-" + table.name(),
InternalTypeInfo.fromRowType(FileMonitorTable.getRowType()))
.setParallelism(1);
}

public static ChannelComputer<InternalRow> createChannelComputer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;

/** {@link Source} that does not require coordination between JobManager and TaskManagers. */
public abstract class AbstractNonCoordinatedSource<T>
implements Source<T, SimpleSourceSplit, NoOpEnumState> {
@Override
public SplitEnumerator<SimpleSourceSplit, NoOpEnumState> createEnumerator(
SplitEnumeratorContext<SimpleSourceSplit> enumContext) {
return new NoOpEnumerator<>();
}

@Override
public SplitEnumerator<SimpleSourceSplit, NoOpEnumState> restoreEnumerator(
SplitEnumeratorContext<SimpleSourceSplit> enumContext, NoOpEnumState checkpoint) {
return new NoOpEnumerator<>();
}

@Override
public SimpleVersionedSerializer<SimpleSourceSplit> getSplitSerializer() {
return new SimpleSourceSplitSerializer();
}

@Override
public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
return new NoOpEnumStateSerializer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.flink.api.connector.source.SourceReader;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/** Abstract {@link SourceReader} for {@link AbstractNonCoordinatedSource}. */
public abstract class AbstractNonCoordinatedSourceReader<T>
implements SourceReader<T, SimpleSourceSplit> {
@Override
public void start() {}

@Override
public List<SimpleSourceSplit> snapshotState(long l) {
return Collections.emptyList();
}

@Override
public CompletableFuture<Void> isAvailable() {
return CompletableFuture.completedFuture(null);
}

@Override
public void addSplits(List<SimpleSourceSplit> list) {}

@Override
public void notifyNoMoreSplits() {}

@Override
public void close() throws Exception {}
}
Loading

0 comments on commit c20e1a6

Please sign in to comment.