Skip to content

Commit

Permalink
add powerjob for TIS, make powerjob testcase testCreateWorkflowJob pass
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Dec 2, 2023
1 parent 0904f30 commit d40dd2f
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 277 deletions.
6 changes: 6 additions & 0 deletions tis-datax/tis-datax-common-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
<artifactId>tis-plugin</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-manage-pojo</artifactId>
<scope>provided</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.qlangtech.tis</groupId>-->
<!-- <artifactId>tis-solrj-client</artifactId>-->
Expand All @@ -74,6 +79,7 @@
<artifactId>tis-hadoop-rpc</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
import com.alibaba.citrus.turbine.Context;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.qlangtech.tis.config.ParamsConfig;
import com.qlangtech.tis.datax.*;
import com.qlangtech.tis.datax.AdapterDataxReader;
import com.qlangtech.tis.datax.IDataxGlobalCfg;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.datax.IDataxWriter;
import com.qlangtech.tis.datax.IGroupChildTaskIterator;
import com.qlangtech.tis.datax.TableAliasMapper;
import com.qlangtech.tis.datax.impl.DataXCfgGenerator;
import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.datax.impl.DataxReader;
Expand All @@ -37,8 +42,10 @@
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.ds.AdapterSelectedTab;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.SqlTaskNodeMeta;
import com.qlangtech.tis.sql.parser.TopologyDir;
Expand All @@ -52,7 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author: 百岁([email protected]
Expand Down Expand Up @@ -103,23 +110,31 @@ public List<IDataxReader> getReaders(IPluginContext pluginCtx) {
List<DependencyNode> dumpNodes = topology.getDumpNodes();


Map<String, Set<String>> dbIds = Maps.newHashMap();
Set<String> tabs = null;
Map<String/*dbName*/, SelectedTabs> dbIds = Maps.newHashMap();
SelectedTabs tabs = null;
for (DependencyNode dump : dumpNodes) {
tabs = dbIds.get(dump.getDbName());
if (tabs == null) {
tabs = Sets.newHashSet();
tabs = new SelectedTabs();
dbIds.put(dump.getDbName(), tabs);
}
tabs.add(dump.getName());

tabs.addDumpNode(dump);
// dbIds.add(dump.getDbName());
}

dbIds.entrySet().forEach((entry) -> {
readers.add(new AdapterDataxReader(DataxReader.load(null, true, entry.getKey())) {
@Override
public IGroupChildTaskIterator getSubTasks() {
return super.getSubTasks((tab) -> entry.getValue().contains(tab.getName()));
return super.getSubTasks((tab) -> entry.getValue().contains(tab));
}

@Override
public List<TopologySelectedTab> getSelectedTabs() {
return super.getSelectedTabs().stream()//
.map((tab) -> new TopologySelectedTab(tab, entry.getValue().getTopologyId(tab))) //
.collect(Collectors.toList());
}
});
});
Expand All @@ -130,6 +145,36 @@ public IGroupChildTaskIterator getSubTasks() {
}
}

public static class TopologySelectedTab extends AdapterSelectedTab {
private final String topologyId;

public TopologySelectedTab(ISelectedTab target, String topologyId) {
super(target);
this.topologyId = topologyId;
}

public String getTopologyId() {
return this.topologyId;
}
}

private static class SelectedTabs {
private final Map<String /*tabName*/, String/*toplogId*/> tab2ToplogId = Maps.newHashMap();

public void addDumpNode(DependencyNode dumpNode) {
tab2ToplogId.put(dumpNode.getName(), dumpNode.getId());
}

public boolean contains(ISelectedTab tab) {
return tab2ToplogId.containsKey(tab.getName());
}

public String getTopologyId(ISelectedTab tab) {
return Objects.requireNonNull(tab2ToplogId.get(tab.getName()) //
, "tabName:" + tab.getName() + " relevant topologyName can not be null");
}
}

public SqlTaskNodeMeta.SqlDataFlowTopology getTopology() {
try {
return SqlTaskNodeMeta.getSqlDataFlowTopology(this.name);
Expand Down

This file was deleted.

Loading

0 comments on commit d40dd2f

Please sign in to comment.