Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 committed Nov 13, 2024
1 parent 1c9494f commit 0b8d998
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public DorisOptions(Map<String, String> config) {
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
}
this.streamLoadProp = getStreamLoadPropFromConfig(config);
this.enableGroupCommit =
ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC());
this.enableGroupCommit = ConfigCheckUtils.validateGroupCommitMode(this);
}

private Properties getStreamLoadPropFromConfig(Map<String, String> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import static org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.converter.ConverterMode;
import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
Expand Down Expand Up @@ -301,7 +303,11 @@ private static boolean validateEnumInstances(String value, String[] instances) {
return false;
}

public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) {
@VisibleForTesting
public static boolean validateGroupCommitMode(DorisOptions dorisOptions) {
Properties streamLoadProp = dorisOptions.getStreamLoadProp();
boolean enable2PC = dorisOptions.enable2PC();
boolean force2PC = dorisOptions.force2PC();
if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
return false;
}
Expand All @@ -312,13 +318,18 @@ public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean
throw new DorisException(
"The value of group commit mode is an illegal parameter, illegal value="
+ value);
} else if (enable2PC) {
} else if (enable2PC && force2PC) {
throw new DorisException(
"When group commit is enabled, you should disable two phase commit! Please set 'enable.2pc':'false'");
} else if (streamLoadProp.containsKey(PARTIAL_COLUMNS)
&& streamLoadProp.get(PARTIAL_COLUMNS).equals("true")) {
throw new DorisException(
"When group commit is enabled,you can not load data with partial column update.");
} else if (enable2PC) {
// The default enable2PC is true, in the scenario of group commit, it needs to be closed
LOG.info(
"The Group Commit mode is on, the two phase commit default value should be disabled.");
dorisOptions.setEnable2PC(false);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.doris.kafka.connector.converter.RecordService;
import org.apache.doris.kafka.connector.exception.ArgumentsException;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.service.RestService;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,16 +95,6 @@ public DorisWriter(
this.connectionProvider = connectionProvider;
this.recordService = new RecordService(dorisOptions);
this.connectMonitor = connectMonitor;
checkDorisTableKey(tableName);
}

/** The uniq model has 2pc close by default unless 2pc is forced open. */
private void checkDorisTableKey(String tableName) {
if (dorisOptions.enable2PC()
&& !dorisOptions.force2PC()
&& RestService.isUniqueKeyType(dorisOptions, tableName, LOG)) {
dorisOptions.setEnable2PC(false);
}
}

/** read offset from doris */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.kafka.connector.exception.StreamLoadException;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.model.KafkaRespContent;
import org.apache.doris.kafka.connector.service.RestService;
import org.apache.doris.kafka.connector.utils.BackendUtils;
import org.apache.doris.kafka.connector.utils.FileNameUtils;
import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
Expand Down Expand Up @@ -66,6 +67,20 @@ public StreamLoadWriter(
BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, LOG);
this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions, topic);
checkDorisTableKey(tableName);
}

/** The uniq model has 2pc close by default unless 2pc is forced open. */
@VisibleForTesting
public void checkDorisTableKey(String tableName) {
if (dorisOptions.enable2PC()
&& !dorisOptions.force2PC()
&& RestService.isUniqueKeyType(dorisOptions, tableName, LOG)) {
LOG.info(
"The {} table type is unique model, the two phase commit default value should be disabled.",
tableName);
dorisOptions.setEnable2PC(false);
}
}

public void fetchOffset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.doris.kafka.connector.cfg;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -292,54 +294,67 @@ public void testConvertToLowercase() {
}
}

private DorisOptions initDorisOptions(Map<String, String> customConfig) {
Properties loadProps = new Properties();
InputStream stream =
this.getClass()
.getClassLoader()
.getResourceAsStream("doris-connector-sink.properties");
try {
loadProps.load(stream);
DorisSinkConnectorConfig.setDefaultValues((Map) loadProps);
loadProps.put("task_id", "1");
} catch (IOException e) {
throw new DorisException(e);
}
Map<String, String> config = (Map) loadProps;
config.putAll(customConfig);
return new DorisOptions(config);
}

@Test(expected = DorisException.class)
public void testGroupCommitWithIllegalParams() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_modes");
Properties streamLoadProp = getStreamLoadPropFromConfig(config);
config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
DorisOptions dorisOptions = initDorisOptions(config);
ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
}

@Test(expected = DorisException.class)
public void testGroupCommitModeWithEnable2pc() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_mode");
Properties streamLoadProp = getStreamLoadPropFromConfig(config);
boolean enable2pc = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, enable2pc);
config.put(DorisSinkConnectorConfig.ENABLE_2PC, "true");
DorisOptions dorisOptions = initDorisOptions(config);
ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
}

@Test
public void testGroupCommitModeWithDisable2pc() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_mode");
config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
DorisOptions dorisOptions = initDorisOptions(config);
ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
Assert.assertFalse(dorisOptions.enable2PC());
}

@Test(expected = DorisException.class)
public void testGroupCommitWithPartialUpdate() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_mode");
config.put("sink.properties.partial_columns", "true");
config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
Properties streamLoadProp = getStreamLoadPropFromConfig(config);
ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
DorisOptions dorisOptions = initDorisOptions(config);
ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
}

@Test
public void testGroupCommitWithAsyncMode() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "async_mode");
Properties streamLoadProp = getStreamLoadPropFromConfig(config);
config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
}

private Properties getStreamLoadPropFromConfig(Map<String, String> config) {
Properties streamLoadProp = new Properties();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
String subKey =
entry.getKey()
.substring(
DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
streamLoadProp.put(subKey, entry.getValue());
}
}
return streamLoadProp;
DorisOptions dorisOptions = initDorisOptions(config);
ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
Assert.assertFalse(dorisOptions.enable2PC());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;

import java.io.IOException;
Expand All @@ -37,19 +38,23 @@
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.service.RestService;
import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;

public class TestStreamLoadWriter {

private DorisWriter dorisWriter;
private DorisOptions dorisOptions;

private final Map<String, String> label2Status = new HashMap<>();
private MockedStatic<RestService> mockRestService;

@Before
public void init() throws IOException {
Expand All @@ -63,6 +68,7 @@ public void init() throws IOException {
props.put("task_id", "1");
props.put("name", "sink-connector-test");
dorisOptions = new DorisOptions((Map) props);
mockRestService = mockStatic(RestService.class);
fillLabel2Status();
}

Expand Down Expand Up @@ -99,6 +105,9 @@ private StreamLoadWriter mockStreamLoadWriter(Map<String, String> label2Status)
new JdbcConnectionProvider(dorisOptions),
dorisConnectMonitor));

mockRestService
.when(() -> RestService.isUniqueKeyType(any(), any(), any()))
.thenReturn(true);
doReturn(label2Status).when(streamLoadWriter).fetchLabel2Status();
return streamLoadWriter;
}
Expand Down Expand Up @@ -128,6 +137,9 @@ public void flush() throws Exception {
dorisConnectMonitor);
streamLoadWriter.setDorisStreamLoad(streamLoad);

mockRestService
.when(() -> RestService.isUniqueKeyType(any(), any(), any()))
.thenReturn(true);
dorisWriter = streamLoadWriter;
dorisWriter.insert(TestRecordBuffer.newSinkRecord("doris-1", 1));
dorisWriter.insert(TestRecordBuffer.newSinkRecord("doris-2", 2));
Expand All @@ -148,8 +160,40 @@ public void putBuffer() {
dorisOptions,
new JdbcConnectionProvider(dorisOptions),
dorisConnectMonitor);

mockRestService
.when(() -> RestService.isUniqueKeyType(any(), any(), any()))
.thenReturn(false);
SinkRecord record = TestRecordBuffer.newSinkRecord("doris-1", 2);
dorisWriter.putBuffer(record);
Assert.assertEquals(2, dorisWriter.getBuffer().getLastOffset());
}

@Test
public void test2PCInUnique() {

StreamLoadWriter dorisWriter = (StreamLoadWriter) mockStreamLoadWriter(new HashMap<>());
// test 2PC in unique key model scenario
mockRestService
.when(() -> RestService.isUniqueKeyType(any(), any(), any()))
.thenReturn(true);
dorisWriter.checkDorisTableKey("unique_table");
Assert.assertFalse(dorisOptions.enable2PC());
}

@Test
public void test2PCNotUnique() {
StreamLoadWriter dorisWriter = (StreamLoadWriter) mockStreamLoadWriter(new HashMap<>());
// test 2PC in not unique key model scenario
mockRestService
.when(() -> RestService.isUniqueKeyType(any(), any(), any()))
.thenReturn(false);
dorisWriter.checkDorisTableKey("not_unique_table");
Assert.assertTrue(dorisOptions.enable2PC());
}

@After
public void close() {
mockRestService.close();
}
}

0 comments on commit 0b8d998

Please sign in to comment.