diff --git a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraReader.java b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraReader.java index ed68b4045..c979e1ccf 100644 --- a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraReader.java +++ b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraReader.java @@ -48,9 +48,9 @@ public class DataXCassandraReader extends BasicDataXRdbmsReader d.getFlinkStreamGenerateTplResource() - , () -> DefaultDataxProcessor.super.getFlinkStreamGenerateTplResource()); + return writerPluginOverwrite((d) -> d.getFlinkStreamGenerateTplResource(), + () -> DefaultDataxProcessor.super.getFlinkStreamGenerateTplResource()); -// TISSinkFactory sinKFactory = TISSinkFactory.getIncrSinKFactory(this.identityValue()); -// Objects.requireNonNull(sinKFactory, "writer plugin can not be null"); -// if (sinKFactory instanceof IStreamIncrGenerateStrategy) { -// return ((IStreamIncrGenerateStrategy) sinKFactory).getFlinkStreamGenerateTemplateFileName(); -// } -// -// return super.getFlinkStreamGenerateTemplateFileName(); + // TISSinkFactory sinKFactory = TISSinkFactory.getIncrSinKFactory(this.identityValue()); + // Objects.requireNonNull(sinKFactory, "writer plugin can not be null"); + // if (sinKFactory instanceof IStreamIncrGenerateStrategy) { + // return ((IStreamIncrGenerateStrategy) sinKFactory).getFlinkStreamGenerateTemplateFileName(); + // } + // + // return super.getFlinkStreamGenerateTemplateFileName(); } @Override @@ -122,18 +123,27 @@ private T writerPluginOverwrite(Function fun @TISExtension() public static class DescriptorImpl extends Descriptor { + private static final Pattern PATTERN_START_WITH_NUMBER = Pattern.compile("^\\d.+"); + public DescriptorImpl() { super(); this.registerSelectOptions(KEY_FIELD_NAME, () -> ParamsConfig.getItems(IDataxGlobalCfg.KEY_DISPLAY_NAME)); } public boolean validateName(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) { + + if (PATTERN_START_WITH_NUMBER.matcher(value).matches()) { + msgHandler.addFieldError(context, fieldName, "不能以数字开头"); + return false; + } + UploadPluginMeta pluginMeta = (UploadPluginMeta) context.get(UploadPluginMeta.KEY_PLUGIN_META); Objects.requireNonNull(pluginMeta, "pluginMeta can not be null"); if (pluginMeta.isUpdate()) { return true; } - return msgHandler.validateBizLogic(IFieldErrorHandler.BizLogic.APP_NAME_DUPLICATE, context, fieldName, value); + return msgHandler.validateBizLogic(IFieldErrorHandler.BizLogic.APP_NAME_DUPLICATE, context, fieldName, + value); } @Override diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java index 9f11f0926..0fcf5461e 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java @@ -140,9 +140,8 @@ public final IGroupChildTaskIterator getSubTasks(Predicate filter) List tabs = this.selectedTabs.stream().filter(filter).collect(Collectors.toList()); // DS dsFactory = this.getDataSourceFactory(); - TableColsMeta tabColsMap = getTabsMeta(); - return new DataXRdbmsGroupChildTaskIterator(this, this.isFilterUnexistCol(), tabs, tabColsMap); + return new DataXRdbmsGroupChildTaskIterator(this, this.getUnexistColFilter(), tabs); // AtomicInteger selectedTabIndex = new AtomicInteger(0); // AtomicInteger taskIndex = new AtomicInteger(0); @@ -231,12 +230,12 @@ public final IGroupChildTaskIterator getSubTasks(Predicate filter) // return new ColumnMetaData(index[0]++, colName, new DataType(-999), false, true); // } - protected boolean isFilterUnexistCol() { - return false; + protected DataXRdbmsGroupChildTaskIterator.FilterUnexistCol getUnexistColFilter() { + return DataXRdbmsGroupChildTaskIterator.FilterUnexistCol.noneFilter(); } - private TableColsMeta getTabsMeta() { + TableColsMeta getTabsMeta() { return new TableColsMeta(getDataSourceFactory(), this.dbName); diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/DataXRdbmsGroupChildTaskIterator.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/DataXRdbmsGroupChildTaskIterator.java index cc3504934..a95139e3c 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/DataXRdbmsGroupChildTaskIterator.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/DataXRdbmsGroupChildTaskIterator.java @@ -50,8 +50,8 @@ public class DataXRdbmsGroupChildTaskIterator implements IGroupChildTaskIterator private final List tabs; final int selectedTabsSize; AtomicReference> dumperItRef = new AtomicReference<>(); - private TableColsMeta tabColsMap; - private final boolean isFilterUnexistCol; + // private TableColsMeta tabColsMap; + private final FilterUnexistCol filterUnexistCol; private final BasicDataXRdbmsReader rdbmsReader; @Override @@ -59,13 +59,49 @@ public Map> getGroupedInfo() { return groupedInfo; } - public DataXRdbmsGroupChildTaskIterator(BasicDataXRdbmsReader rdbmsReader, boolean isFilterUnexistCol, List tabs - , TableColsMeta tabColsMap) { + public DataXRdbmsGroupChildTaskIterator(BasicDataXRdbmsReader rdbmsReader, FilterUnexistCol filterUnexistCol, + List tabs) { this.rdbmsReader = rdbmsReader; this.tabs = tabs; this.selectedTabsSize = tabs.size(); - this.tabColsMap = tabColsMap; - this.isFilterUnexistCol = isFilterUnexistCol; + this.filterUnexistCol = filterUnexistCol; + // this.tabColsMap = tabColsMap; + // this.filterUnexistCol = this.filterUnexistCol; + } + + interface FilterUnexistCol extends AutoCloseable { + + static FilterUnexistCol noneFilter() { + return new FilterUnexistCol() { + @Override + public List getCols(SelectedTab tab) { + return tab.getColKeys(); + } + + @Override + public void close() throws Exception { + + } + }; + } + + static FilterUnexistCol filterByRealtimeSchema(BasicDataXRdbmsReader rdbmsReader) { + final TableColsMeta tabColsMap = rdbmsReader.getTabsMeta(); + return new FilterUnexistCol() { + @Override + public List getCols(SelectedTab tab) { + Map tableMetadata = tabColsMap.get(tab.getName()); + return tab.getColKeys().stream().filter((c) -> tableMetadata.containsKey(c)).collect(Collectors.toList()); + } + + @Override + public void close() throws Exception { + tabColsMap.close(); + } + }; + } + + public List getCols(SelectedTab tab); } @Override @@ -92,15 +128,15 @@ private Iterator initDataSourceDumperIterator() { if (StringUtils.isEmpty(tab.getName())) { throw new IllegalStateException("tableName can not be null"); } -// List tableMetadata = null; -// IDataSourceDumper dumper = null; + // List tableMetadata = null; + // IDataSourceDumper dumper = null; DataDumpers dataDumpers = null; TISTable tisTab = new TISTable(); tisTab.setTableName(tab.getName()); -// int[] index = {0}; -// tisTab.setReflectCols(tab.getCols().stream().map((c) -> { -// return createColumnMetaData(index, c.getName()); -// }).collect(Collectors.toList())); + // int[] index = {0}; + // tisTab.setReflectCols(tab.getCols().stream().map((c) -> { + // return createColumnMetaData(index, c.getName()); + // }).collect(Collectors.toList())); dataDumpers = this.rdbmsReader.getDataSourceFactory().getDataDumpers(tisTab); dumperIt = dataDumpers.dumpers; @@ -113,30 +149,31 @@ private Iterator initDataSourceDumperIterator() { @Override public IDataxReaderContext next() { Iterator dumperIterator = dumperItRef.get(); - Objects.requireNonNull(dumperIterator, "dumperIterator can not be null,selectedTabIndex:" + selectedTabIndex.get()); + Objects.requireNonNull(dumperIterator, + "dumperIterator can not be null,selectedTabIndex:" + selectedTabIndex.get()); IDataSourceDumper dumper = dumperIterator.next(); SelectedTab tab = tabs.get(selectedTabIndex.get() - 1); String childTask = tab.getName() + "_" + taskIndex.getAndIncrement(); - List childTasks - = groupedInfo.computeIfAbsent(tab.getName(), (tabname) -> Lists.newArrayList()); - childTasks.add(new DataXCfgGenerator.DBDataXChildTask(dumper.getDbHost() - , this.rdbmsReader.getDataSourceFactory().identityValue(), childTask)); + List childTasks = groupedInfo.computeIfAbsent(tab.getName(), + (tabname) -> Lists.newArrayList()); + childTasks.add(new DataXCfgGenerator.DBDataXChildTask(dumper.getDbHost(), + this.rdbmsReader.getDataSourceFactory().identityValue(), childTask)); RdbmsReaderContext dataxContext = rdbmsReader.createDataXReaderContext(childTask, tab, dumper); dataxContext.setWhere(tab.getWhere()); - if (isFilterUnexistCol) { - Map tableMetadata = tabColsMap.get(tab.getName()); - dataxContext.setCols(tab.getColKeys().stream() - .filter((c) -> tableMetadata.containsKey(c)).collect(Collectors.toList())); - } else { - dataxContext.setCols(tab.getColKeys()); - } + dataxContext.setCols(filterUnexistCol.getCols(tab)); + + return dataxContext; } @Override public void close() throws IOException { - this.tabColsMap.close(); + try { + this.filterUnexistCol.close(); + } catch (Exception e) { + throw new IOException(e); + } } } diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbReader.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbReader.java index eff149cba..503cecc99 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbReader.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbReader.java @@ -35,6 +35,7 @@ import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.extension.impl.BaseSubFormProperties; import com.qlangtech.tis.extension.impl.IOUtils; +import com.qlangtech.tis.plugin.CompanionPluginFactory; import com.qlangtech.tis.plugin.ValidatorCommons; import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.FormFieldType; @@ -43,14 +44,15 @@ import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsReader; import com.qlangtech.tis.plugin.datax.common.RdbmsReaderContext; import com.qlangtech.tis.plugin.datax.mongo.MongoCMeta; +import com.qlangtech.tis.plugin.datax.mongo.MongoCMetaCreatorFactory; import com.qlangtech.tis.plugin.datax.mongo.MongoColumnMetaData; import com.qlangtech.tis.plugin.datax.mongo.MongoSelectedTabExtend; import com.qlangtech.tis.plugin.ds.*; import com.qlangtech.tis.plugin.ds.mangodb.MangoDBDataSourceFactory; -import com.qlangtech.tis.plugin.incr.ISelectedTabExtendFactory; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; +import com.qlangtech.tis.util.UploadPluginMeta; import com.qlangtech.tis.util.impl.AttrVals; import org.apache.commons.lang.StringUtils; import org.bson.BsonDocument; @@ -71,6 +73,7 @@ * @author: baisui 百岁 * @create: 2021-04-07 15:30 * @see com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader + * @see com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader.Task **/ @Public public class DataXMongodbReader extends BasicDataXRdbmsReader { @@ -166,64 +169,70 @@ public static String getDftTemplate() { // } @Override - protected RdbmsReaderContext createDataXReaderContext(String jobName, SelectedTab tab, IDataSourceDumper dumper) { - return null; + protected boolean shallFillSelectedTabMeta() { + // 不需要填充字段类型 + return false; } - private DataType convertType(String type) { - if (!acceptTypes.contains(type)) { - throw new IllegalArgumentException("illegal type:" + type); - } - switch (type) { - case "int": - case "long": - return DataXReaderColType.Long.dataType; - case "double": - return DataXReaderColType.Double.dataType; - case "string": - case "array": - return DataXReaderColType.STRING.dataType; - case "date": - return DataXReaderColType.Date.dataType; - case "boolean": - return DataXReaderColType.Boolean.dataType; - case "bytes": - return DataXReaderColType.Bytes.dataType; - default: - throw new IllegalStateException("illegal type:" + type); - } + @Override + protected RdbmsReaderContext createDataXReaderContext(String jobName, SelectedTab tab, IDataSourceDumper dumper) { + return new MongoDBReaderContext(jobName, tab, dumper, this); } + // private DataType convertType(String type) { + // if (!acceptTypes.contains(type)) { + // throw new IllegalArgumentException("illegal type:" + type); + // } + // switch (type) { + // case "int": + // case "long": + // return DataXReaderColType.Long.dataType; + // case "double": + // return DataXReaderColType.Double.dataType; + // case "string": + // case "array": + // return DataXReaderColType.STRING.dataType; + // case "date": + // return DataXReaderColType.Date.dataType; + // case "boolean": + // return DataXReaderColType.Boolean.dataType; + // case "bytes": + // return DataXReaderColType.Bytes.dataType; + // default: + // throw new IllegalStateException("illegal type:" + type); + // } + // } - public static class ColCfg { - private String name; - private String type; - private String splitter; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getSplitter() { - return splitter; - } - public void setSplitter(String splitter) { - this.splitter = splitter; - } - } + // public static class ColCfg { + // private String name; + // private String type; + // private String splitter; + // + // public String getName() { + // return name; + // } + // + // public void setName(String name) { + // this.name = name; + // } + // + // public String getType() { + // return type; + // } + // + // public void setType(String type) { + // this.type = type; + // } + // + // public String getSplitter() { + // return splitter; + // } + // + // public void setSplitter(String splitter) { + // this.splitter = splitter; + // } + // } // @Override @@ -232,11 +241,27 @@ public void setSplitter(String splitter) { // } @TISExtension() - public static class DefaultDescriptor extends BasicDataXRdbmsReaderDescriptor implements ISelectedTabExtendFactory, SubForm.ISubFormItemValidate { + public static class DefaultDescriptor extends BasicDataXRdbmsReaderDescriptor // + implements SubForm.ISubFormItemValidate, CompanionPluginFactory { public DefaultDescriptor() { super(); } + @Override + public SelectedTabExtend getCompanionPlugin(UploadPluginMeta pluginMeta) { + String tabName = pluginMeta.getExtraParam(IPropertyType.SubFormFilter.PLUGIN_META_SUBFORM_DETAIL_ID_VALUE); + if (StringUtils.isEmpty(tabName)) { + throw new IllegalArgumentException("param:" + IPropertyType.SubFormFilter.PLUGIN_META_SUBFORM_DETAIL_ID_VALUE + " is not exsit in pluginMeta"); + } + return SelectedTabExtend.getBatchPluginStore(pluginMeta.getPluginContext(), + pluginMeta.getDataXName(true)).find(tabName, false); + } + + @Override + public Descriptor getCompanionDescriptor() { + return TIS.get().getDescriptor(MongoSelectedTabExtend.class); + } + public boolean validateInspectRowCount(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) { @@ -264,6 +289,12 @@ public boolean validateSubFormItems(IControlMsgHandler msgHandler, Context conte return true; } + private String splitMetas(int colIndex, int docSplitFieldIndex, String fieldKey) { + return joinField(SelectedTab.KEY_FIELD_COLS, Lists.newArrayList(colIndex), + MongoCMetaCreatorFactory.KEY_DOC_FIELD_SPLIT_METAS, Lists.newArrayList(docSplitFieldIndex), + fieldKey); + } + @Override public boolean validateSubForm(IControlMsgHandler msgHandler, Context context, SelectedTab tab) { @@ -284,8 +315,7 @@ public boolean validateSubForm(IControlMsgHandler msgHandler, Context context, S int docSplitFieldIndex = 0; for (MongoCMeta.MongoDocSplitCMeta splitCMeta : mongoCMeta.getDocFieldSplitMetas()) { - String fieldJsonPathKey = joinField(SelectedTab.KEY_FIELD_COLS, Lists.newArrayList(colIndex, - docSplitFieldIndex), "jsonPath"); + final String fieldJsonPathKey = splitMetas(colIndex, docSplitFieldIndex, "jsonPath"); jsonPath = splitCMeta.getJsonPath(); @@ -298,8 +328,7 @@ public boolean validateSubForm(IControlMsgHandler msgHandler, Context context, S } name = splitCMeta.getName(); - final String fieldNameKey = joinField(SelectedTab.KEY_FIELD_COLS, Lists.newArrayList(colIndex, - docSplitFieldIndex), nameKey); + final String fieldNameKey = splitMetas(colIndex, docSplitFieldIndex, nameKey); if (Validator.require.validate(msgHandler, context, fieldNameKey, name) // && Validator.db_col_name.validate(msgHandler, context, fieldNameKey, name)) { @@ -335,9 +364,7 @@ public boolean validateSubForm(IControlMsgHandler msgHandler, Context context, S msgHandler.addFieldError(context, joinField(SelectedTab.KEY_FIELD_COLS, fieldIndex, nameKey), "字段重复"); - - msgHandler.addFieldError(context, joinField(SelectedTab.KEY_FIELD_COLS, - Lists.newArrayList(colIndex, docSplitFieldIndex), nameKey), "字段重复"); + msgHandler.addFieldError(context, splitMetas(colIndex, docSplitFieldIndex, nameKey), "字段重复"); return false; } else { @@ -375,14 +402,10 @@ protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFor return super.verify(msgHandler, context, postFormVals); } - @Override - public final Descriptor getSelectedTableExtendDescriptor() { - return TIS.get().getDescriptor(MongoSelectedTabExtend.class); - } @Override public boolean isSupportIncr() { - return true; + return false; } @Override diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java index c2e5336f8..766bf6afc 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java @@ -51,6 +51,9 @@ /** * @author: baisui 百岁 * @create: 2021-04-07 15:30 + * @see com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter + * @see com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter.Job + * @see com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter.Task **/ @Public public class DataXMongodbWriter extends DataxWriter implements //IDataxProcessor.INullTableMapCreator, @@ -62,13 +65,13 @@ public class DataXMongodbWriter extends DataxWriter implements //IDataxProcesso @FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require}) public String dbName; -// @FormField(ordinal = 3, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.db_col_name}) -// public String collectionName; -// @FormField(ordinal = 4, type = FormFieldType.TEXTAREA, validate = {Validator.require}) -// public String column; + // @FormField(ordinal = 3, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.db_col_name}) + // public String collectionName; + // @FormField(ordinal = 4, type = FormFieldType.TEXTAREA, validate = {Validator.require}) + // public String column; -// @FormField(ordinal = 8, type = FormFieldType.TEXTAREA, validate = {}) -// public String upsertInfo; + // @FormField(ordinal = 8, type = FormFieldType.TEXTAREA, validate = {}) + // public String upsertInfo; @FormField(ordinal = 11, type = FormFieldType.TEXTAREA, advance = false, validate = {Validator.require}) public String template; @@ -101,7 +104,6 @@ public Integer getRowFetchSize() { } - // public static String getDftCollectionName() { // DataxReader dataReader = DataxReader.getThreadBingDataXReader(); // if (dataReader == null) { @@ -140,6 +142,7 @@ public IDataxContext getSubTask(Optional tableMap) { @TISExtension() public static class DefaultDescriptor extends BaseDataxWriterDescriptor implements DataxWriter.IRewriteSuFormProperties { private transient SuFormProperties rewriteSubFormProperties; + public DefaultDescriptor() { super(); } @@ -151,7 +154,7 @@ public boolean validateColumn(IFieldErrorHandler msgHandler, Context context, St @Override protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) { - // JSONArray cols = JSON.parseArray(postFormVals.getField(KEY_FIELD_COLUMN)); + // JSONArray cols = JSON.parseArray(postFormVals.getField(KEY_FIELD_COLUMN)); JSONObject col = null; try { String upsertinfo = postFormVals.getField(KEY_FIELD_UPSERT_INFO); @@ -167,12 +170,12 @@ protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFor return false; } boolean findField = false; -// for (int i = 0; i < cols.size(); i++) { -// col = cols.getJSONObject(i); -// if (StringUtils.equals(upsertKey, col.getString("name"))) { -// findField = true; -// } -// } + // for (int i = 0; i < cols.size(); i++) { + // col = cols.getJSONObject(i); + // if (StringUtils.equals(upsertKey, col.getString("name"))) { + // findField = true; + // } + // } if (!findField) { msgHandler.addFieldError(context, KEY_FIELD_UPSERT_INFO, "属性'upsertKey':" + upsertinfo + @@ -225,10 +228,10 @@ public String getDisplayName() { * implements DataxWriter.IRewriteSuFormProperties Start */ @Override - public Descriptor getRewriterSelectTabDescriptor() { + public Descriptor getRewriterSelectTabDescriptor() { Class targetClass = MongoSelectedTab.class; - return Objects.requireNonNull(TIS.get().getDescriptor(targetClass), "subForm clazz:" + targetClass + " " - + "can not find relevant Descriptor"); + return Objects.requireNonNull(TIS.get().getDescriptor(targetClass), + "subForm clazz:" + targetClass + " " + "can not find relevant Descriptor"); } @Override diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBReaderContext.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBReaderContext.java index ca3f83b61..ef55236fd 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBReaderContext.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBReaderContext.java @@ -19,30 +19,27 @@ package com.qlangtech.tis.plugin.datax; import com.qlangtech.tis.datax.IDataxReaderContext; +import com.qlangtech.tis.plugin.datax.common.RdbmsReaderContext; +import com.qlangtech.tis.plugin.datax.mongo.MongoSelectedTab; +import com.qlangtech.tis.plugin.datax.mongo.MongoSelectedTabExtend; +import com.qlangtech.tis.plugin.ds.IDataSourceDumper; /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-06-06 14:53 **/ -public class MongoDBReaderContext extends BasicMongoDBContext implements IDataxReaderContext { - private final DataXMongodbReader mongodbReader; - private final String taskName; +public class MongoDBReaderContext extends RdbmsReaderContext implements IDataxReaderContext { - public MongoDBReaderContext(String taskName, DataXMongodbReader mongodbReader) { - super(mongodbReader.getDataSourceFactory()); - this.mongodbReader = mongodbReader; - this.taskName = taskName; - } + private final MongoSelectedTab mongoTable; + private final MongoSelectedTabExtend tabExtend; - @Override - public String getReaderContextId() { - return this.dsFactory.identityValue(); + public MongoDBReaderContext(String jobName, SelectedTab tab, IDataSourceDumper dumper, + DataXMongodbReader mongodbReader) { + super(jobName, tab.getName(), dumper, mongodbReader); + this.mongoTable = (MongoSelectedTab) tab; + this.tabExtend = (MongoSelectedTabExtend) this.mongoTable.getSourceProps(); } - public String getCollectionName() { - // return mongodbReader.collectionName; - return null; - } public String getColumn() { // return this.mongodbReader.column; @@ -59,10 +56,6 @@ public String getQuery() { return null; } - @Override - public String getTaskName() { - return this.taskName; - } @Override public String getSourceTableName() { diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoCMetaCreatorFactory.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoCMetaCreatorFactory.java index 0062aca66..24322c739 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoCMetaCreatorFactory.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoCMetaCreatorFactory.java @@ -14,7 +14,7 @@ * @date 2023/9/3 */ public class MongoCMetaCreatorFactory implements CMeta.ElementCreatorFactory { - private static final String KEY_DOC_FIELD_SPLIT_METAS = "docFieldSplitMetas"; + public static final String KEY_DOC_FIELD_SPLIT_METAS = "docFieldSplitMetas"; /** * @param targetCol diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoColumnMetaData.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoColumnMetaData.java index ed2b098b3..289c5638e 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoColumnMetaData.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoColumnMetaData.java @@ -37,11 +37,16 @@ public MongoColumnMetaData(int index, String key, BsonType mongoFieldType) { } public MongoColumnMetaData(int index, String key, BsonType mongoFieldType, int containValCount) { - this(index, key, mapType(mongoFieldType), mongoFieldType, containValCount); + this(index, key, mongoFieldType, containValCount, false); } - public MongoColumnMetaData(int index, String key, DataType dataType, BsonType mongoFieldType, int containValCount) { - super(index, key, dataType, false); + public MongoColumnMetaData(int index, String key, BsonType mongoFieldType, int containValCount, boolean pk) { + this(index, key, mapType(mongoFieldType), mongoFieldType, containValCount, pk); + } + + public MongoColumnMetaData(int index, String key, DataType dataType, BsonType mongoFieldType, int containValCount + , boolean pk) { + super(index, key, dataType, pk); this.mongoFieldType = mongoFieldType; this.containValCount = containValCount; } @@ -52,8 +57,8 @@ public MongoColumnMetaData(int index, String key, DataType dataType, BsonType mo * @param colsSchema * @param bdoc */ - public static void parseMongoDocTypes(boolean parseChildDoc, List parentKeys, Map colsSchema, BsonDocument bdoc) { + public static void parseMongoDocTypes(boolean parseChildDoc, List parentKeys // + , Map colsSchema, BsonDocument bdoc) { int index = 0; BsonValue val; String key; @@ -64,12 +69,13 @@ public static void parseMongoDocTypes(boolean parseChildDoc, List parent val = entry.getValue(); keys = ListUtils.union(parentKeys, Collections.singletonList(entry.getKey())); key = String.join(MongoCMeta.KEY_MONOG_NEST_PROP_SEPERATOR, keys); -// if (val.getBsonType() == BsonType.OBJECT_ID) { -// continue; -// } + // if (val.getBsonType() == BsonType.OBJECT_ID) { + // continue; + // } colMeta = colsSchema.get(key); if (colMeta == null) { - colMeta = new MongoColumnMetaData(index++, key, val.getBsonType()); + colMeta = new MongoColumnMetaData(index++, key, val.getBsonType(), 0, + (val.getBsonType() == BsonType.OBJECT_ID)); colsSchema.put(key, colMeta); } else { if (colMeta.getMongoFieldType() != BsonType.STRING // @@ -82,11 +88,7 @@ public static void parseMongoDocTypes(boolean parseChildDoc, List parent } if (colMeta.getMongoFieldType() == BsonType.DOCUMENT && val.isDocument()) { - // BsonValue val = null; - parseMongoDocTypes(true, keys, parseChildDoc ? colsSchema : colMeta.docTypeFieldEnum, val.asDocument()); - - //colMeta.flatMapMongoDocument(val.asDocument()); } @@ -119,11 +121,12 @@ public static List reorder(Map cols if (col.getMaxStrLength() > DataTypeMeta.getDataTypeMeta(JDBCTypes.VARCHAR).getColsSizeRange().getMax()) { // 超过了varchar colsSize的上限了直接设置为TEXT(LONGVARCHAR)类型 result.set(i, new MongoColumnMetaData(col.getIndex(), col.getKey(), - DataType.getType(JDBCTypes.LONGVARCHAR), BsonType.STRING, col.getContainValCount())); + DataType.getType(JDBCTypes.LONGVARCHAR), BsonType.STRING, col.getContainValCount(), + false)); } else { result.set(i, new MongoColumnMetaData(col.getIndex(), col.getKey(), DataType.create(JDBCTypes.VARCHAR.getType(), JDBCTypes.VARCHAR.getLiteria(), - col.getMaxStrLength()), BsonType.STRING, col.getContainValCount())); + col.getMaxStrLength()), BsonType.STRING, col.getContainValCount(), false)); } } } diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoSelectedTabExtend.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoSelectedTabExtend.java index ef120be05..d1d242b71 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoSelectedTabExtend.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoSelectedTabExtend.java @@ -1,13 +1,20 @@ package com.qlangtech.tis.plugin.datax.mongo; import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.annotation.FormField; +import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.SelectedTabExtend; +import com.qlangtech.tis.plugin.datax.mongo.reader.ReaderFilter; /** * @author 百岁 (baisui@qlangtech.com) * @date 2023/9/1 */ public class MongoSelectedTabExtend extends SelectedTabExtend { + + @FormField(ordinal = 1, validate = {Validator.require}) + public ReaderFilter filter; + @Override public ExtendType getExtendType() { return ExtendType.BATCH_SOURCE; diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilteOff.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilteOff.java new file mode 100644 index 000000000..41edb31ab --- /dev/null +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilteOff.java @@ -0,0 +1,19 @@ +package com.qlangtech.tis.plugin.datax.mongo.reader; + +import com.qlangtech.tis.extension.Descriptor; +import com.qlangtech.tis.extension.TISExtension; + +/** + * @author 百岁 (baisui@qlangtech.com) + * @date 2023/9/8 + */ +public class ReaderFilteOff extends ReaderFilter { + + @TISExtension + public static class DftDesc extends Descriptor { + @Override + public String getDisplayName() { + return SWITCH_OFF; + } + } +} diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilter.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilter.java new file mode 100644 index 000000000..f6edf891b --- /dev/null +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilter.java @@ -0,0 +1,18 @@ +package com.qlangtech.tis.plugin.datax.mongo.reader; + +import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader; +import com.qlangtech.tis.extension.Describable; +import org.bson.Document; + +/** + * @author 百岁 (baisui@qlangtech.com) + * @date 2023/9/8 + * @see MongoDBReader.Task + */ +public class ReaderFilter implements Describable { + + public Document createFilter() { + return new Document(); + } + +} diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java new file mode 100644 index 000000000..936bb81d5 --- /dev/null +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java @@ -0,0 +1,54 @@ +package com.qlangtech.tis.plugin.datax.mongo.reader; + +import com.alibaba.citrus.turbine.Context; +import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader; +import com.qlangtech.tis.extension.Descriptor; +import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.annotation.FormField; +import com.qlangtech.tis.plugin.annotation.FormFieldType; +import com.qlangtech.tis.plugin.annotation.Validator; +import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author 百岁 (baisui@qlangtech.com) + * @date 2023/9/8 + * @see MongoDBReader.Task + */ +public class ReaderFilterNormalQuery extends ReaderFilter { + + private static final Logger logger = LoggerFactory.getLogger(ReaderFilterNormalQuery.class); + + @FormField(ordinal = 1, type = FormFieldType.TEXTAREA, validate = {Validator.require}) + public String query; + + @Override + public Document createFilter() { + Document fitler = Document.parse(query); + return fitler; + } + + @TISExtension + public static class DftDesc extends Descriptor { + @Override + public String getDisplayName() { + return "normalQuery"; + } + + public boolean validateQuery(IFieldErrorHandler msgHandler, Context context, String fieldName, String query) { + try { + Document queryFilter = Document.parse(query); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + msgHandler.addFieldError(context, fieldName, e.getMessage()); + return false; + } + return true; + } + + + } + +} diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterOn.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterOn.java new file mode 100644 index 000000000..2d58a8ade --- /dev/null +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterOn.java @@ -0,0 +1,80 @@ +package com.qlangtech.tis.plugin.datax.mongo.reader; + +import com.alibaba.citrus.turbine.Context; +import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant; +import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader; +import com.qlangtech.tis.extension.Descriptor; +import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.annotation.FormField; +import com.qlangtech.tis.plugin.annotation.FormFieldType; +import com.qlangtech.tis.plugin.annotation.Validator; +import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; +import org.apache.commons.lang.StringUtils; +import org.bson.Document; +import org.bson.types.ObjectId; + +/** + * @author 百岁 (baisui@qlangtech.com) + * @date 2023/9/8 + * @see MongoDBReader.Task + */ +public class ReaderFilterOn extends ReaderFilter { + + @FormField(ordinal = 1, type = FormFieldType.ENUM, validate = {Validator.require}) + public Boolean usingObjectId; + + @FormField(ordinal = 2, type = FormFieldType.INPUTTEXT, validate = {}) + public String lowerBound; + @FormField(ordinal = 3, type = FormFieldType.INPUTTEXT, validate = {}) + public String upperBound; + + @Override + public Document createFilter() { + Document fitler = super.createFilter(); + + Document filter = new Document(); + if (StringUtils.isEmpty(lowerBound)) { + if (StringUtils.isNotEmpty(upperBound)) { + filter.append(KeyConstant.MONGO_PRIMARY_ID // + , new Document("$lt", usingObjectId ? new ObjectId(upperBound) : upperBound)); + } + } else if (StringUtils.isEmpty(upperBound) && StringUtils.isNotEmpty(lowerBound)) { + filter.append(KeyConstant.MONGO_PRIMARY_ID // + , new Document("$gte", usingObjectId ? new ObjectId(lowerBound) : lowerBound)); + } else { + filter.append(KeyConstant.MONGO_PRIMARY_ID // + , new Document("$gte", usingObjectId ? new ObjectId(lowerBound) : lowerBound) // + .append("$lt", usingObjectId ? new ObjectId(upperBound) : upperBound)); + } + + return fitler; + } + + @TISExtension + public static class DftDesc extends Descriptor { + + @Override + protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) { + return this.validateAll(msgHandler, context, postFormVals); + } + + @Override + protected boolean validateAll(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) { + + ReaderFilterOn filter = postFormVals.newInstance(); + if (StringUtils.isEmpty(filter.lowerBound) && StringUtils.isEmpty(filter.upperBound)) { + + msgHandler.addErrorMessage(context, "查询区间不能都为空,上下区间至少填一项"); + return false; + } + + return true; + } + + @Override + public String getDisplayName() { + return "range"; + } + } + +} diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mangodb/MangoDBDataSourceFactory.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mangodb/MangoDBDataSourceFactory.java index b89584c84..dade67fcf 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mangodb/MangoDBDataSourceFactory.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mangodb/MangoDBDataSourceFactory.java @@ -112,7 +112,7 @@ public String getPassword() { @Override public DataDumpers getDataDumpers(TISTable table) { - throw new UnsupportedOperationException(); + return DataDumpers.create(Collections.singletonList(this.address), table); } @Override @@ -137,7 +137,7 @@ public TableInDB getTablesInDB() { @Override public DBConfig getDbConfig() { - throw new IllegalStateException(); + throw new UnsupportedOperationException("getDbConfig"); } @Override diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXMongodbReader.selectedTabs.json b/tis-datax/tis-datax-mongodb-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXMongodbReader.selectedTabs.json index 1e689f3d5..36be33d82 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXMongodbReader.selectedTabs.json +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXMongodbReader.selectedTabs.json @@ -1,4 +1,10 @@ { + "sourceProps": { + "impl": "com.qlangtech.tis.plugin.datax.mongo.MongoSelectedTabExtend" + }, + "where": { + "disable": true + }, "cols": { "elementCreator": "com.qlangtech.tis.plugin.datax.mongo.MongoCMetaCreatorFactory", "viewtype": "tuplelist"