Skip to content

Commit

Permalink
add mongo reader support
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Sep 3, 2023
1 parent 4cb7203 commit 7107caf
Show file tree
Hide file tree
Showing 51 changed files with 1,479 additions and 769 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class CassandraDatasourceFactory extends DataSourceFactory {
public static final String DATAX_NAME = "Cassandra";



/**
* 节点描述
*/
Expand Down Expand Up @@ -97,16 +96,15 @@ public List<ColumnMetaData> getTableMetadata(boolean inSink, EntityName table) {
AtomicInteger index = new AtomicInteger();
processSession((session) -> {
ColumnMetaData cmeta = null;
ResultSet resultSet = session.execute(
"SELECT column_name,type FROM system_schema.columns WHERE keyspace_name = '"
+ this.dbName + "' AND table_name = '" + table.getTabName() + "'");
ResultSet resultSet = session.execute("SELECT column_name,type FROM system_schema.columns WHERE " +
"keyspace_name = '" + this.dbName + "' AND table_name = '" + table.getTabName() + "'");
Iterator<Row> rows = resultSet.iterator();
Row row = null;
while (rows.hasNext()) {
row = rows.next();
//int index, String key, int type, boolean pk
cmeta = new ColumnMetaData(index.getAndIncrement(), row.getString(0)
, new DataType(convertType(row.getString(1))), false, true);
cmeta = new ColumnMetaData(index.getAndIncrement(), row.getString(0),
new DataType(JDBCTypes.parse(convertType(row.getString(1)))), false, true);
// tables.add(row.getString(0));
colsMeta.add(cmeta);
}
Expand Down Expand Up @@ -156,7 +154,8 @@ private int convertType(String type) {
public TableInDB getTablesInDB() {
TableInDB tables = TableInDB.create(this);
processSession((session) -> {
ResultSet resultSet = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = '" + this.dbName + "' ");
ResultSet resultSet = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = "
+ "'" + this.dbName + "' ");
Iterator<Row> rows = resultSet.iterator();
Row row = null;
while (rows.hasNext()) {
Expand All @@ -171,7 +170,8 @@ private void processSession(ISessionVisit sessionVisit) {
Cluster cluster = null;
Session session = null;
if (StringUtils.isNotEmpty(this.userName)) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(userName, password).withPort(this.port).addContactPoints(getHosts());
Cluster.Builder clusterBuilder =
Cluster.builder().withCredentials(userName, password).withPort(this.port).addContactPoints(getHosts());
if (useSSL != null && useSSL) {
clusterBuilder = clusterBuilder.withSSL();
}
Expand Down Expand Up @@ -200,10 +200,10 @@ public void visitFirstConnection(IConnProcessor connProcessor) {
throw new UnsupportedOperationException();
}

// @Override
// public void refectTableInDB(TableInDB tabs, Connection conn) throws SQLException {
// throw new UnsupportedOperationException();
// }
// @Override
// public void refectTableInDB(TableInDB tabs, Connection conn) throws SQLException {
// throw new UnsupportedOperationException();
// }

interface ISessionVisit {
void visit(Session session);
Expand Down Expand Up @@ -238,7 +238,8 @@ protected boolean supportFacade() {
return false;
}

public boolean validateNodeDesc(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
public boolean validateNodeDesc(IFieldErrorHandler msgHandler, Context context, String fieldName,
String value) {

String[] hosts = StringUtils.split(value, ",");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;

import java.sql.Types;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -102,30 +101,30 @@ protected void appendTabMeta(List<String> pk) {

private String convertType(CMeta col) {
DataType type = col.getType();
switch (type.type) {
case Types.INTEGER:
case Types.TINYINT:
case Types.SMALLINT:
switch (type.getJdbcType()) {
case INTEGER:
case TINYINT:
case SMALLINT:
return "Int32";
case Types.BIGINT:
case BIGINT:
return "Int64";
case Types.FLOAT:
case FLOAT:
return "Float32";
case Types.DOUBLE:
case Types.DECIMAL:
case DOUBLE:
case DECIMAL:
return "Float64";
case Types.DATE:
case DATE:
return "Date";
case Types.TIME:
case Types.TIMESTAMP:
case TIME:
case TIMESTAMP:
return "DateTime";
case Types.BIT:
case Types.BOOLEAN:
case BIT:
case BOOLEAN:
return "UInt8";
case Types.BLOB:
case Types.BINARY:
case Types.LONGVARBINARY:
case Types.VARBINARY:
case BLOB:
case BINARY:
case LONGVARBINARY:
case VARBINARY:
default:
return "String";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected DataType createColDataType(String colName, String typeName, int dbColT
colSize = Short.MAX_VALUE;
}
}
return new DataType(dbColType, typeName, colSize);
return DataType.create(dbColType, typeName, colSize);
}
});
}
Expand Down
Loading

0 comments on commit 7107caf

Please sign in to comment.