Skip to content

Commit

Permalink
[FLINK-33304] Introduce the DeduplicatedMutator to resolve the mutati…
Browse files Browse the repository at this point in the history
…on write conflict problem. This closes #30

Co-authored-by: tanjialiang <[email protected]>
  • Loading branch information
Tan-JiaLiang and Tan-JiaLiang authored Nov 3, 2023
1 parent 4e99328 commit e0971c3
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;

import org.apache.hadoop.hbase.TableName;
Expand All @@ -41,11 +44,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.Expressions.$;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -280,6 +286,59 @@ public void testTableSink() throws Exception {
TestBaseUtils.compareResultAsText(results, expected);
}

@Test
public void testTableSinkWithChangelog() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);

// register values table for source
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")),
Row.ofKind(RowKind.DELETE, 1, Row.of("Hello2")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello1")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello2")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello3")),
Row.ofKind(RowKind.DELETE, 2, Row.of("Hello3")),
Row.ofKind(RowKind.INSERT, 1, Row.of("Hello3"))));
tEnv.executeSql(
"CREATE TABLE source_table ("
+ " rowkey INT,"
+ " family1 ROW<name STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'data-id' = '"
+ dataId
+ "',"
+ " 'changelog-mode'='I,UA,UB,D'"
+ ")");

// register HBase table for sink
tEnv.executeSql(
"CREATE TABLE sink_table ("
+ " rowkey INT,"
+ " family1 ROW<name STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_4
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");

tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table").await();

TableResult result = tEnv.executeSql("SELECT * FROM sink_table");

List<Row> actual = CollectionUtil.iteratorToList(result.collect());
assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3"))));
}

@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";

protected static final String ROW_KEY = "rowkey";

Expand Down Expand Up @@ -92,6 +93,7 @@ private static void prepareTables() throws IOException {
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
createHBaseTable4();
}

private static void createHBaseTable1() throws IOException {
Expand Down Expand Up @@ -232,6 +234,13 @@ private static void createHBaseTable3() {
createTable(tableName, families, SPLIT_KEYS);
}

private static void createHBaseTable4() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
TableName tableName = TableName.valueOf(TEST_TABLE_4);
createTable(tableName, families, SPLIT_KEYS);
}

private static Put putRow(
int rowKey,
int f1c1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
Expand All @@ -47,6 +48,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
Expand All @@ -55,6 +58,7 @@
import java.util.stream.StreamSupport;

import static org.apache.flink.table.api.Expressions.$;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -312,6 +316,59 @@ public void testTableSink() throws Exception {
TestBaseUtils.compareResultAsText(results, String.join("", expected));
}

@Test
public void testTableSinkWithChangelog() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);

// register values table for source
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")),
Row.ofKind(RowKind.DELETE, 1, Row.of("Hello2")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello1")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello2")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello3")),
Row.ofKind(RowKind.DELETE, 2, Row.of("Hello3")),
Row.ofKind(RowKind.INSERT, 1, Row.of("Hello3"))));
tEnv.executeSql(
"CREATE TABLE source_table ("
+ " rowkey INT,"
+ " family1 ROW<name STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'data-id' = '"
+ dataId
+ "',"
+ " 'changelog-mode'='I,UA,UB,D'"
+ ")");

// register HBase table for sink
tEnv.executeSql(
"CREATE TABLE sink_table ("
+ " rowkey INT,"
+ " family1 ROW<name STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'table-name' = '"
+ TEST_TABLE_4
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");

tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table").await();

TableResult result = tEnv.executeSql("SELECT * FROM sink_table");

List<Row> actual = CollectionUtil.iteratorToList(result.collect());
assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3"))));
}

@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";

protected static final String ROW_KEY = "rowkey";

Expand Down Expand Up @@ -92,6 +93,7 @@ private static void prepareTables() throws IOException {
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
createHBaseTable4();
}

private static void createHBaseTable1() throws IOException {
Expand Down Expand Up @@ -232,6 +234,13 @@ private static void createHBaseTable3() {
createTable(tableName, families, SPLIT_KEYS);
}

private static void createHBaseTable4() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
TableName tableName = TableName.valueOf(TEST_TABLE_4);
createTable(tableName, families, SPLIT_KEYS);
}

private static Put putRow(
int rowKey,
int f1c1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -71,7 +76,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
private final HBaseMutationConverter<T> mutationConverter;

private transient Connection connection;
private transient BufferedMutator mutator;
private transient DeduplicatedMutator mutator;

private transient ScheduledExecutorService executor;
private transient ScheduledFuture scheduledFuture;
Expand Down Expand Up @@ -121,7 +126,9 @@ public void open(Configuration parameters) throws Exception {
if (bufferFlushMaxSizeInBytes > 0) {
params.writeBufferSize(bufferFlushMaxSizeInBytes);
}
this.mutator = connection.getBufferedMutator(params);
this.mutator =
new DeduplicatedMutator(
(int) bufferFlushMaxMutations, connection.getBufferedMutator(params));

if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor =
Expand Down Expand Up @@ -201,7 +208,7 @@ public void invoke(T value, Context context) throws Exception {
}

private void flush() throws IOException {
// BufferedMutator is thread-safe
// DeduplicatedMutator is thread-safe
mutator.flush();
numPendingRequests.set(0);
checkErrorAndRethrow();
Expand Down Expand Up @@ -256,4 +263,39 @@ public void onException(RetriesExhaustedWithDetailsException exception, Buffered
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}

/**
* Thread-safe class, grouped mutations by rows and keep the latest mutation. For more info, see
* <a href="https://issues.apache.org/jira/browse/HBASE-8626">HBASE-8626</a>.
*/
private static class DeduplicatedMutator {

private final BufferedMutator mutator;
private final Map<ByteBuffer, Mutation> mutations;

DeduplicatedMutator(int size, BufferedMutator mutator) {
this.mutator = mutator;
this.mutations = new HashMap<>(size);
}

synchronized void mutate(Mutation current) {
ByteBuffer key = ByteBuffer.wrap(current.getRow());
Mutation old = mutations.get(key);
if (old == null || current.getTimeStamp() >= old.getTimeStamp()) {
mutations.put(key, current);
}
}

synchronized void flush() throws IOException {
mutator.mutate(new ArrayList<>(mutations.values()));
mutator.flush();
mutations.clear();
}

synchronized void close() throws IOException {
mutator.mutate(new ArrayList<>(mutations.values()));
mutator.close();
mutations.clear();
}
}
}

0 comments on commit e0971c3

Please sign in to comment.