Skip to content

Commit

Permalink
make postgresql flink-cdc UT test pass
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Dec 6, 2023
1 parent 94d6a87 commit a926b5f
Show file tree
Hide file tree
Showing 15 changed files with 1,029 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class TestSelectedTabs {
new DataType(JDBCTypes.VARCHAR), false, true));

public static List<SelectedTab> createSelectedTabs() {
return createSelectedTabs(Integer.MAX_VALUE);
return createSelectedTabs(Integer.MAX_VALUE);
}

public static List<SelectedTab> createSelectedTabs(int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.qlangtech.plugins.incr.flink.cdc;

import com.google.common.collect.Lists;
import com.qlangtech.tis.extension.impl.IOUtils;
import com.qlangtech.tis.extension.impl.XmlFile;
import com.qlangtech.tis.plugin.PluginStore;
Expand Down Expand Up @@ -69,7 +70,11 @@ public static SelectedTab createSelectedTab(EntityName tabName //
if (CollectionUtils.isEmpty(tableMetadata)) {
throw new IllegalStateException("tabName:" + tabName + " relevant can not be empty");
}
List<String> pks = Lists.newArrayList();
List<CMeta> colsMeta = tableMetadata.stream().map((col) -> {
if (col.isPk()) {
pks.add(col.getName());
}
CMeta c = new CMeta();
c.setPk(col.isPk());
c.setName(col.getName());
Expand All @@ -78,7 +83,11 @@ public static SelectedTab createSelectedTab(EntityName tabName //
c.setComment(col.getComment());
return c;
}).collect(Collectors.toList());
SelectedTab baseTab = new TestSelectedTab(tabName.getTableName(), colsMeta);
if (CollectionUtils.isEmpty(pks)) {
throw new IllegalStateException("pks can not be empty");
}
TestSelectedTab baseTab = new TestSelectedTab(tabName.getTableName(), colsMeta);
baseTab.primaryKeys = pks;
baseTab.setCols(tableMetadata.stream().map((m) -> m.getName()).collect(Collectors.toList()));
baseTabSetter.accept(baseTab);

Expand All @@ -90,6 +99,10 @@ public TestSelectedTab(String name, List<CMeta> colsMeta) {
this.cols.addAll(colsMeta);
}

@Override
public void setCols(List<String> cols) {
// super.setCols(cols);
}
// @Override
// public List<CMeta> getCols() {
// return colsMeta;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.plugins.incr.flink.cdc;

import java.io.Serializable;
import java.util.function.Function;

/**
* @author: 百岁([email protected]
* @create: 2022-12-25 10:52
**/
public class DefaultTableNameConvert implements Function<String,String> {
public class DefaultTableNameConvert implements Function<String, String>, Serializable {
@Override
public String apply(String physicsTabName) {
return physicsTabName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ public boolean isAllCols() {
public List<CMeta> getCols() {
return cols;
}


}
17 changes: 8 additions & 9 deletions tis-incr/tis-flink-cdc-postgresql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@


<artifactId>tis-flink-cdc-postgresql-plugin</artifactId>
<!-- 先不开放使用
<packaging>tpi</packaging>-->

<packaging>tpi</packaging>

<dependencies>

<!-- <dependency>-->
<!-- <groupId>com.qlangtech.tis</groupId>-->
<!-- <artifactId>tis-dag</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.qlangtech.tis</groupId>-->
<!-- <artifactId>tis-dag</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->

<dependency>
<groupId>com.qlangtech.tis.plugins</groupId>
Expand All @@ -53,7 +53,6 @@
</dependency>



<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
Expand All @@ -67,7 +66,7 @@
</dependency>
<dependency>
<groupId>com.qlangtech.tis.plugins</groupId>
<artifactId>tis-datax-common-plugin</artifactId>
<artifactId>tis-datax-common-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource
}


List<ReaderSource> readerSources = SourceChannel.getSourceFunction(dsFactory, tabs, (dbHost, dbs, tbs, debeziumProperties) -> {
List<ReaderSource> readerSources = SourceChannel.getSourceFunction(
dsFactory, tabs, (dbHost, dbs, tbs, debeziumProperties) -> {
DateTimeConverter.setDatetimeConverters(PGDateTimeConverter.class.getName(), debeziumProperties);

return dbs.getDbStream().map((dbname) -> {
Expand All @@ -87,7 +88,9 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource
.database(dbname) // monitor postgres database
.schemaList(schemaSupported.getDBSchema()) // monitor inventory schema
.tableList(tbs.toArray(new String[tbs.size()])) // monitor products table
// .tableList("tis.base")
.username(dsFactory.userName)
.decodingPluginName(sourceFactory.decodingPluginName)
.password(dsFactory.password)
.debeziumProperties(debeziumProperties)
.deserializer(new TISDeserializationSchema()) // converts SourceRecord to JSON String
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.plugins.incr.flink.cdc.postgresql;
Expand All @@ -22,25 +22,34 @@
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.datax.IDataXPluginMeta;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.plugin.IEndTypeGetter;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;


import java.util.Objects;
import java.util.Optional;

/**
* https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
* @author: 百岁([email protected]
* @create: 2021-09-27 15:15
**/
@Public
public class FlinkCDCPostreSQLSourceFactory extends MQListenerFactory {
private transient IConsumerHandle consumerHandle;


// The name of the Postgres logical decoding plug-in installed on the server. Supported
// * values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,
// * wal2json_rds_streaming and pgoutput.

/**
* The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
*/
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
public String decodingPluginName;

// @FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
// public String startupOptions;

Expand All @@ -57,6 +66,7 @@ public class FlinkCDCPostreSQLSourceFactory extends MQListenerFactory {
// }
// }


@Override
public IMQListener create() {
FlinkCDCPostgreSQLSourceFunction sourceFunctionCreator = new FlinkCDCPostgreSQLSourceFunction(this);
Expand Down Expand Up @@ -84,6 +94,7 @@ public String getDisplayName() {
public PluginVender getVender() {
return PluginVender.FLINK_CDC;
}

@Override
public IEndTypeGetter.EndType getEndType() {
return IEndTypeGetter.EndType.Postgres;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"decodingPluginName": {
"dftVal": "decoderbufs",
"label": "解码器",
"help": "The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.",
"enum": [
{
"val": "decoderbufs",
"label": "decoderbufs"
},
{
"val": "wal2json",
"label": "wal2json"
},
{
"val": "wal2json_rds",
"label": "wal2json_rds"
},
{
"val": "wal2json_streaming",
"label": "wal2json_streaming"
},
{
"val": "wal2json_rds_streaming",
"label": "wal2json_rds_streaming"
},
{
"val": "pgoutput",
"label": "pgoutput"
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
Expand Down Expand Up @@ -51,13 +52,20 @@
public abstract class PostgresTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PostgresTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

public static final String DEFAULT_DB = "postgres";
private static final DockerImageName PG_IMAGE =
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");

// DockerImageName.parse("debezium/postgres:9.6")
DockerImageName.parse("postgres:latest")
.asCompatibleSubstituteFor("postgres");
// config_file=/etc/postgresql/postgresql.conf
//document: https://hub.docker.com/_/postgres
protected static final PostgreSQLContainer<?> POSTGERS_CONTAINER =
new PostgreSQLContainer<>(PG_IMAGE)
.withDatabaseName("postgres")
.withClasspathResourceMapping( //
"postgresql/postgresql.conf" //
, "/etc/postgresql/postgresql.conf" //
, BindMode.READ_WRITE)
.withDatabaseName(DEFAULT_DB)
.withUsername("postgres")
.withPassword("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOG) {
Expand All @@ -68,7 +76,16 @@ public void accept(OutputFrame outputFrame) {
System.out.println(utf8String);
super.accept(outputFrame);
}
});
}).withCommand(
"postgres",
"-c",
// default
"fsync=off",
"-c",
"max_replication_slots=20",
"-c",
"config_file=/etc/postgresql/postgresql.conf");


@BeforeClass
public static void startContainers() {
Expand Down Expand Up @@ -96,20 +113,21 @@ protected void initializePostgresTable(String sqlFile) {
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
boolean success;
for (String stmt : statements) {
statement.execute(stmt);
success = statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Loading

0 comments on commit a926b5f

Please sign in to comment.