Skip to content

Commit

Permalink
add mongodb datax reader support for TIS
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Sep 9, 2023
1 parent 15d37f2 commit dbaf2ae
Show file tree
Hide file tree
Showing 16 changed files with 415 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public class DataXCassandraReader extends BasicDataXRdbmsReader<CassandraDatasou
@FormField(ordinal = 10, type = FormFieldType.ENUM, validate = {})
public String consistancyLevel;

protected boolean isFilterUnexistCol() {
return false;
}
// protected boolean getUnexistColFilter() {
// return false;
// }

public static String getDftTemplate() {
return IOUtils.loadResourceFromClasspath(DataXCassandraReader.class, "DataXCassandraReader-tpl.json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.regex.Pattern;

/**
* @author: baisui 百岁
Expand Down Expand Up @@ -88,16 +89,16 @@ public IDataxGlobalCfg getDataXGlobalCfg() {
@Override
public IStreamTemplateResource getFlinkStreamGenerateTplResource() {

return writerPluginOverwrite((d) -> d.getFlinkStreamGenerateTplResource()
, () -> DefaultDataxProcessor.super.getFlinkStreamGenerateTplResource());
return writerPluginOverwrite((d) -> d.getFlinkStreamGenerateTplResource(),
() -> DefaultDataxProcessor.super.getFlinkStreamGenerateTplResource());

// TISSinkFactory sinKFactory = TISSinkFactory.getIncrSinKFactory(this.identityValue());
// Objects.requireNonNull(sinKFactory, "writer plugin can not be null");
// if (sinKFactory instanceof IStreamIncrGenerateStrategy) {
// return ((IStreamIncrGenerateStrategy) sinKFactory).getFlinkStreamGenerateTemplateFileName();
// }
//
// return super.getFlinkStreamGenerateTemplateFileName();
// TISSinkFactory sinKFactory = TISSinkFactory.getIncrSinKFactory(this.identityValue());
// Objects.requireNonNull(sinKFactory, "writer plugin can not be null");
// if (sinKFactory instanceof IStreamIncrGenerateStrategy) {
// return ((IStreamIncrGenerateStrategy) sinKFactory).getFlinkStreamGenerateTemplateFileName();
// }
//
// return super.getFlinkStreamGenerateTemplateFileName();
}

@Override
Expand All @@ -122,18 +123,27 @@ private <T> T writerPluginOverwrite(Function<IStreamIncrGenerateStrategy, T> fun
@TISExtension()
public static class DescriptorImpl extends Descriptor<IAppSource> {

private static final Pattern PATTERN_START_WITH_NUMBER = Pattern.compile("^\\d.+");

public DescriptorImpl() {
super();
this.registerSelectOptions(KEY_FIELD_NAME, () -> ParamsConfig.getItems(IDataxGlobalCfg.KEY_DISPLAY_NAME));
}

public boolean validateName(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {

if (PATTERN_START_WITH_NUMBER.matcher(value).matches()) {
msgHandler.addFieldError(context, fieldName, "不能以数字开头");
return false;
}

UploadPluginMeta pluginMeta = (UploadPluginMeta) context.get(UploadPluginMeta.KEY_PLUGIN_META);
Objects.requireNonNull(pluginMeta, "pluginMeta can not be null");
if (pluginMeta.isUpdate()) {
return true;
}
return msgHandler.validateBizLogic(IFieldErrorHandler.BizLogic.APP_NAME_DUPLICATE, context, fieldName, value);
return msgHandler.validateBizLogic(IFieldErrorHandler.BizLogic.APP_NAME_DUPLICATE, context, fieldName,
value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ public final IGroupChildTaskIterator getSubTasks(Predicate<ISelectedTab> filter)
List<SelectedTab> tabs = this.selectedTabs.stream().filter(filter).collect(Collectors.toList());
// DS dsFactory = this.getDataSourceFactory();

TableColsMeta tabColsMap = getTabsMeta();

return new DataXRdbmsGroupChildTaskIterator(this, this.isFilterUnexistCol(), tabs, tabColsMap);
return new DataXRdbmsGroupChildTaskIterator(this, this.getUnexistColFilter(), tabs);

// AtomicInteger selectedTabIndex = new AtomicInteger(0);
// AtomicInteger taskIndex = new AtomicInteger(0);
Expand Down Expand Up @@ -231,12 +230,12 @@ public final IGroupChildTaskIterator getSubTasks(Predicate<ISelectedTab> filter)
// return new ColumnMetaData(index[0]++, colName, new DataType(-999), false, true);
// }

protected boolean isFilterUnexistCol() {
return false;
protected DataXRdbmsGroupChildTaskIterator.FilterUnexistCol getUnexistColFilter() {
return DataXRdbmsGroupChildTaskIterator.FilterUnexistCol.noneFilter();
}


private TableColsMeta getTabsMeta() {
TableColsMeta getTabsMeta() {


return new TableColsMeta(getDataSourceFactory(), this.dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,58 @@ public class DataXRdbmsGroupChildTaskIterator implements IGroupChildTaskIterator
private final List<SelectedTab> tabs;
final int selectedTabsSize;
AtomicReference<Iterator<IDataSourceDumper>> dumperItRef = new AtomicReference<>();
private TableColsMeta tabColsMap;
private final boolean isFilterUnexistCol;
// private TableColsMeta tabColsMap;
private final FilterUnexistCol filterUnexistCol;
private final BasicDataXRdbmsReader rdbmsReader;

@Override
public Map<String, List<DataXCfgGenerator.DBDataXChildTask>> getGroupedInfo() {
return groupedInfo;
}

public DataXRdbmsGroupChildTaskIterator(BasicDataXRdbmsReader rdbmsReader, boolean isFilterUnexistCol, List<SelectedTab> tabs
, TableColsMeta tabColsMap) {
public DataXRdbmsGroupChildTaskIterator(BasicDataXRdbmsReader rdbmsReader, FilterUnexistCol filterUnexistCol,
List<SelectedTab> tabs) {
this.rdbmsReader = rdbmsReader;
this.tabs = tabs;
this.selectedTabsSize = tabs.size();
this.tabColsMap = tabColsMap;
this.isFilterUnexistCol = isFilterUnexistCol;
this.filterUnexistCol = filterUnexistCol;
// this.tabColsMap = tabColsMap;
// this.filterUnexistCol = this.filterUnexistCol;
}

interface FilterUnexistCol extends AutoCloseable {

static FilterUnexistCol noneFilter() {
return new FilterUnexistCol() {
@Override
public List<String> getCols(SelectedTab tab) {
return tab.getColKeys();
}

@Override
public void close() throws Exception {

}
};
}

static FilterUnexistCol filterByRealtimeSchema(BasicDataXRdbmsReader rdbmsReader) {
final TableColsMeta tabColsMap = rdbmsReader.getTabsMeta();
return new FilterUnexistCol() {
@Override
public List<String> getCols(SelectedTab tab) {
Map<String, ColumnMetaData> tableMetadata = tabColsMap.get(tab.getName());
return tab.getColKeys().stream().filter((c) -> tableMetadata.containsKey(c)).collect(Collectors.toList());
}

@Override
public void close() throws Exception {
tabColsMap.close();
}
};
}

public List<String> getCols(SelectedTab tab);
}

@Override
Expand All @@ -92,15 +128,15 @@ private Iterator<IDataSourceDumper> initDataSourceDumperIterator() {
if (StringUtils.isEmpty(tab.getName())) {
throw new IllegalStateException("tableName can not be null");
}
// List<ColumnMetaData> tableMetadata = null;
// IDataSourceDumper dumper = null;
// List<ColumnMetaData> tableMetadata = null;
// IDataSourceDumper dumper = null;
DataDumpers dataDumpers = null;
TISTable tisTab = new TISTable();
tisTab.setTableName(tab.getName());
// int[] index = {0};
// tisTab.setReflectCols(tab.getCols().stream().map((c) -> {
// return createColumnMetaData(index, c.getName());
// }).collect(Collectors.toList()));
// int[] index = {0};
// tisTab.setReflectCols(tab.getCols().stream().map((c) -> {
// return createColumnMetaData(index, c.getName());
// }).collect(Collectors.toList()));

dataDumpers = this.rdbmsReader.getDataSourceFactory().getDataDumpers(tisTab);
dumperIt = dataDumpers.dumpers;
Expand All @@ -113,30 +149,31 @@ private Iterator<IDataSourceDumper> initDataSourceDumperIterator() {
@Override
public IDataxReaderContext next() {
Iterator<IDataSourceDumper> dumperIterator = dumperItRef.get();
Objects.requireNonNull(dumperIterator, "dumperIterator can not be null,selectedTabIndex:" + selectedTabIndex.get());
Objects.requireNonNull(dumperIterator,
"dumperIterator can not be null,selectedTabIndex:" + selectedTabIndex.get());
IDataSourceDumper dumper = dumperIterator.next();
SelectedTab tab = tabs.get(selectedTabIndex.get() - 1);
String childTask = tab.getName() + "_" + taskIndex.getAndIncrement();
List<DataXCfgGenerator.DBDataXChildTask> childTasks
= groupedInfo.computeIfAbsent(tab.getName(), (tabname) -> Lists.newArrayList());
childTasks.add(new DataXCfgGenerator.DBDataXChildTask(dumper.getDbHost()
, this.rdbmsReader.getDataSourceFactory().identityValue(), childTask));
List<DataXCfgGenerator.DBDataXChildTask> childTasks = groupedInfo.computeIfAbsent(tab.getName(),
(tabname) -> Lists.newArrayList());
childTasks.add(new DataXCfgGenerator.DBDataXChildTask(dumper.getDbHost(),
this.rdbmsReader.getDataSourceFactory().identityValue(), childTask));
RdbmsReaderContext dataxContext = rdbmsReader.createDataXReaderContext(childTask, tab, dumper);

dataxContext.setWhere(tab.getWhere());

if (isFilterUnexistCol) {
Map<String, ColumnMetaData> tableMetadata = tabColsMap.get(tab.getName());
dataxContext.setCols(tab.getColKeys().stream()
.filter((c) -> tableMetadata.containsKey(c)).collect(Collectors.toList()));
} else {
dataxContext.setCols(tab.getColKeys());
}
dataxContext.setCols(filterUnexistCol.getCols(tab));


return dataxContext;
}

@Override
public void close() throws IOException {
this.tabColsMap.close();
try {
this.filterUnexistCol.close();
} catch (Exception e) {
throw new IOException(e);
}
}
}
Loading

0 comments on commit dbaf2ae

Please sign in to comment.