Skip to content

Commit

Permalink
[Feature-#1889][ftp] read support column index and sheetNO
Browse files Browse the repository at this point in the history
  • Loading branch information
libailin authored and zoudaokoulife committed Mar 18, 2024
1 parent 994c2d1 commit bf9458e
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,39 @@

package com.dtstack.chunjun.connector.ftp.client.excel;

import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig;

import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.read.metadata.ReadSheet;

import java.util.ArrayList;
import java.util.List;

public class ExcelReaderExecutor implements Runnable {

private final ExcelReader reader;
private ExcelSubExceptionCarrier ec;
private IFormatConfig config;

public ExcelReaderExecutor(ExcelReader reader, ExcelSubExceptionCarrier ec) {
public ExcelReaderExecutor(
ExcelReader reader, ExcelSubExceptionCarrier ec, IFormatConfig config) {
this.reader = reader;
this.ec = ec;
this.config = config;
}

@Override
public void run() {
try {
reader.readAll();
if (config.getSheetNo() != null) {
List<ReadSheet> readSheetList = new ArrayList<>();
for (int i = 0; i < config.getSheetNo().size(); i++) {
readSheetList.add(new ReadSheet(config.getSheetNo().get(i)));
}
reader.read(readSheetList);
} else {
reader.readAll();
}
} catch (Exception e) {
ec.setThrowable(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;
import java.util.Map;

import static com.dtstack.chunjun.connector.ftp.config.ConfigConstants.DEFAULT_FTP_PORT;
Expand Down Expand Up @@ -91,4 +92,10 @@ public void setDefaultPort() {
port = DEFAULT_FTP_PORT;
}
}

/** 工作表 */
public List<Integer> sheetNo;

/** 字段对应的列索引 */
public List<Integer> columnIndex;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Data;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@Data
Expand All @@ -45,4 +46,6 @@ public class IFormatConfig implements Serializable {

/* 行分隔符 */
private String columnDelimiter;

public List<Integer> sheetNo;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void open(File file, InputStream inputStream, IFormatConfig config) {
.namingPattern("excel-schedule-pool-%d")
.daemon(false)
.build());
ExcelReaderExecutor executor = new ExcelReaderExecutor(reader, ec);
ExcelReaderExecutor executor = new ExcelReaderExecutor(reader, ec, config);
executorService.execute(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@ public class FtpOptions extends BaseFileOptions {
.stringType()
.noDefaultValue()
.withDescription("compress type");
public static final ConfigOption<String> SHEET_NO =
ConfigOptions.key("sheet-no")
.stringType()
.noDefaultValue()
.withDescription("sheet no, Multiple numbers separated by commas(,)");
public static final ConfigOption<String> COLUMN_INDEX =
ConfigOptions.key("column-index")
.stringType()
.noDefaultValue()
.withDescription("column index, Multiple numbers separated by commas(,)");
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException
}

if (rowConverter instanceof FtpSqlConverter) {
// 处理字段配置了对应的列索引
if (ftpConfig.getColumnIndex() != null) {
List<FieldConfig> columns = ftpConfig.getColumn();
String[] fieldsData = new String[columns.size()];
for (int i = 0; i < CollectionUtils.size(columns); i++) {
FieldConfig fieldConfig = columns.get(i);
if (fieldConfig.getIndex() >= fields.length) {
String errorMessage =
String.format(
"The column index is greater than the data size."
+ " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
fieldConfig.getIndex(), fields.length);
throw new IllegalArgumentException(errorMessage);
}
fieldsData[i] = fields[fieldConfig.getIndex()];
}
fields = fieldsData;
}
// 解决数据里包含特殊符号(逗号、换行符)
rowData = rowConverter.toInternal(fields);
} else if (rowConverter instanceof FtpSyncConverter) {
Expand Down Expand Up @@ -278,6 +296,7 @@ private IFormatConfig buildIFormatConfig(FtpConfig ftpConfig) {
iFormatConfig.setFetchMaxSize(ftpConfig.getMaxFetchSize());
iFormatConfig.setParallelism(ftpConfig.getParallelism());
iFormatConfig.setColumnDelimiter(ftpConfig.getColumnDelimiter());
iFormatConfig.setSheetNo(ftpConfig.getSheetNo());

return iFormatConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class FtpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

Expand Down Expand Up @@ -99,6 +101,20 @@ private static FtpConfig getFtpConfByOptions(ReadableConfig config) {
if (config.get(FtpOptions.FIRST_LINE_HEADER) != null) {
ftpConfig.setFirstLineHeader(config.get(FtpOptions.FIRST_LINE_HEADER));
}
if (StringUtils.isNotBlank(config.get(FtpOptions.SHEET_NO))) {
List<Integer> sheetNo =
Arrays.stream(config.get(FtpOptions.SHEET_NO).split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
ftpConfig.setSheetNo(sheetNo);
}
if (StringUtils.isNotBlank(config.get(FtpOptions.COLUMN_INDEX))) {
List<Integer> columnIndex =
Arrays.stream(config.get(FtpOptions.COLUMN_INDEX).split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
ftpConfig.setColumnIndex(columnIndex);
}
return ftpConfig;
}

Expand All @@ -118,13 +134,24 @@ public DynamicTableSource createDynamicTableSource(Context context) {

List<Column> columns = resolvedSchema.getColumns();
FtpConfig ftpConfig = getFtpConfByOptions(config);
if (ftpConfig.getColumnIndex() != null
&& columns.size() != ftpConfig.getColumnIndex().size()) {
throw new IllegalArgumentException(
String.format(
"The number of fields (%s) is inconsistent with the number of indexes (%s).",
columns.size(), ftpConfig.getColumnIndex().size()));
}
List<FieldConfig> columnList = new ArrayList<>(columns.size());
for (Column column : columns) {
FieldConfig field = new FieldConfig();
field.setName(column.getName());
field.setType(
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
field.setIndex(columns.indexOf(column));
int index =
ftpConfig.getColumnIndex() != null
? ftpConfig.getColumnIndex().get(columns.indexOf(column))
: columns.indexOf(column);
field.setIndex(index);
columnList.add(field);
}
ftpConfig.setColumn(columnList);
Expand Down Expand Up @@ -199,6 +226,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(FtpOptions.COMPRESS_TYPE);
options.add(BaseFileOptions.NEXT_CHECK_ROWS);
options.add(BaseFileOptions.WRITE_MODE);
options.add(FtpOptions.SHEET_NO);
options.add(FtpOptions.COLUMN_INDEX);
return options;
}
}

0 comments on commit bf9458e

Please sign in to comment.