Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

made schema non transient #51

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/GoogleSheets-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ Names Row contains less number of columns.

**Read Buffer Size:** Number of rows the source reads with a single API request. Default value is 100.

**Enable Cleansing Column Names:** Toggle that specifies whether to cleanse column names containing special characters.
Special characters will be replaced by underscores.

### Steps to Generate OAuth2 Credentials
1. Create credentials for the Client ID and Client Secret properties [here](https://console.cloud.google.com/apis/credentials).
2. On the Create OAuth client ID page, under Authorized redirect URIs, specify a URI of `http://localhost:8080`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
Expand All @@ -51,11 +52,14 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
config.validate(failureCollector);
failureCollector.getOrThrowException();

pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(failureCollector));
Schema configuredSchema = config.getSchema(failureCollector);
failureCollector.getOrThrowException();
stageConfigurer.setOutputSchema(configuredSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
private static final Logger LOG = LoggerFactory.getLogger(GoogleSheetsSourceConfig.class);
private static final Pattern CELL_ADDRESS = Pattern.compile("^([A-Z]+)([0-9]+)$");
private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+");
private static final Pattern COLUMN_NAME = Pattern.compile("^[A-Za-z_][A-Za-z0-9_-]*$");
public static final String CLEANSE_COLUMN_NAMES = "columnNameCleansingEnabled";
private static LinkedHashMap<Integer, ColumnComplexSchemaInfo> dataSchemaInfo = new LinkedHashMap<>();

@Name(SHEETS_TO_PULL)
Expand All @@ -118,7 +120,7 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
@Name(NAME_SCHEMA)
@Description("The schema of the table to read.")
@Macro
private transient Schema schema = null;
private String schema;

@Name(FORMATTING)
@Description("Output format for numeric sheet cells. " +
Expand Down Expand Up @@ -234,6 +236,13 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
@Macro
private String sheetFieldName;

@Nullable
@Name(CLEANSE_COLUMN_NAMES)
@Description("Toggle that specifies whether to cleanse column names containing special characters. " +
"Special characters will be replaced by underscores.")
@Macro
private Boolean columnNameCleansingEnabled;

public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIdentifiers, String formatting,
Boolean skipEmptyData, String columnNamesSelection,
@Nullable Integer customColumnNamesRow, String metadataFieldName,
Expand All @@ -242,7 +251,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde
@Nullable String lastDataColumn, @Nullable String lastDataRow,
@Nullable String metadataCells, @Nullable Integer readBufferSize,
@Nullable Boolean addNameFields, @Nullable String spreadsheetFieldName,
@Nullable String sheetFieldName) {
@Nullable String sheetFieldName, @Nullable String schema) {

super(referenceName);
this.sheetsIdentifiers = sheetsIdentifiers;
Expand All @@ -262,6 +271,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde
this.addNameFields = addNameFields;
this.spreadsheetFieldName = spreadsheetFieldName;
this.sheetFieldName = sheetFieldName;
this.schema = schema;
}


Expand All @@ -281,9 +291,15 @@ public Schema getSchema(FailureCollector collector) {
"Perhaps no validation step was executed before schema generation.")
.withConfigProperty(SCHEMA);
}
schema = SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values()));
return SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values()));
}
return schema;
try {
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
} catch (IOException e) {
collector.addFailure("Invalid schema: " + e.getMessage(),
null).withConfigProperty(NAME_SCHEMA);
}
throw collector.getOrThrowException();
}

private boolean shouldGetSchema() {
Expand Down Expand Up @@ -609,7 +625,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
}
String title = columnHeaderCell.getFormattedValue();
if (StringUtils.isNotEmpty(title)) {
title = checkTitleFormat(title, seenFieldNames);
title = checkTitleFormat(title, seenFieldNames, i);

// for merge we should analyse sub headers for data schemas
if (isMergeHead) {
Expand Down Expand Up @@ -643,7 +659,7 @@ private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int leng
if (StringUtils.isEmpty(subHeaderTitle)) {
subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1);
}
subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames);
subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames, i);
} else {
subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1);
}
Expand All @@ -662,7 +678,14 @@ private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int leng
return subHeaders;
}

private String checkTitleFormat(String title, Map<String, Integer> seenFieldNames) {
private String checkTitleFormat(String title, Map<String, Integer> seenFieldNames, int columnIndex) {
if (getColumnNameCleansingEnabled()) {
return applyFileTitleConvention(title, seenFieldNames);
}
return applySheetTitleConvention(title, columnIndex);
}

private String applyFileTitleConvention(String title, Map<String, Integer> seenFieldNames) {
final String replacementChar = "_";

StringBuilder cleanFieldNameBuilder = new StringBuilder();
Expand Down Expand Up @@ -692,6 +715,17 @@ private String checkTitleFormat(String title, Map<String, Integer> seenFieldName
return cleanFieldNameBuilder.toString();
}

private String applySheetTitleConvention(String title, int columnIndex) {
if (!COLUMN_NAME.matcher(title).matches()) {
String defaultColumnName = ColumnAddressConverter.getColumnName(columnIndex + 1);
LOG.warn(String.format("Original column name '%s' doesn't satisfy column name requirements '%s', " +
"the default column name '%s' will be used.", title, COLUMN_NAME.pattern(),
defaultColumnName));
return defaultColumnName;
}
return title;
}

private Schema getDataCellSchema(List<CellData> dataRow, int index, String headerName) {
Schema dataSchema = Schema.of(Schema.Type.STRING);
if (dataRow != null && dataRow.size() > index) {
Expand Down Expand Up @@ -950,6 +984,11 @@ public boolean getAutoDetectRowsAndColumns() {
return Boolean.TRUE.equals(autoDetectRowsAndColumns);
}

@Nullable
public boolean getColumnNameCleansingEnabled() {
return Boolean.TRUE.equals(columnNameCleansingEnabled);
}

public Integer getLastDataColumn() {
return lastDataColumn == null ? 0 : Integer.parseInt(lastDataColumn);
}
Expand Down Expand Up @@ -1051,8 +1090,8 @@ public void setSheetsIdentifiers(String sheetsIdentifiers) {
this.sheetsIdentifiers = sheetsIdentifiers;
}

public void setSchema(String schema) throws IOException {
this.schema = Schema.parseJson(schema);
public void setSchema(String schema) {
this.schema = schema;
}

public void setFormatting(String formatting) {
Expand Down Expand Up @@ -1143,6 +1182,10 @@ public void setEndDate(String endDate) {
this.endDate = endDate;
}

public void setColumnNameCleansingEnabled(@Nullable Boolean columnNameCleansingEnabled) {
this.columnNameCleansingEnabled = columnNameCleansingEnabled;
}

public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOException {

GoogleSheetsSourceConfig googleSheetsSourceConfig = GoogleSheetsSourceConfig
Expand Down Expand Up @@ -1326,6 +1369,11 @@ public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOExcept
properties.get(GoogleSheetsSourceConfig.AUTO_DETECT_ROWS_AND_COLUMNS).getAsBoolean());
}

if (properties.has(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES)) {
googleSheetsSourceConfig.setColumnNameCleansingEnabled(
properties.get(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES).getAsBoolean());
}

return googleSheetsSourceConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ public void testProcessColumnsWithLastDataColumnLessThanColumnsRowSize()
@Test
public void testProcessColumnsInvalidTitles()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
config.setColumnNameCleansingEnabled(true);
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class, FailureCollector.class);
processColumnsMethod.setAccessible(true);
Expand Down Expand Up @@ -380,6 +381,7 @@ private void setFieldValue(String fieldName, Object fieldValue) throws NoSuchFie
@Test
public void testProcessColumnsSameCaseSensitiveTitles()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
config.setColumnNameCleansingEnabled(true);
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class,
FailureCollector.class);
Expand Down Expand Up @@ -413,4 +415,60 @@ public void testProcessColumnsSameCaseSensitiveTitles()
Assert.assertEquals("Title_with_space_2", columns.get(1).getHeaderTitle());
Assert.assertEquals("Title_with_space_3", columns.get(2).getHeaderTitle());
}

@Test
public void testProcessColumnsInvalidTitlesOldSchema()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class,
FailureCollector.class);
processColumnsMethod.setAccessible(true);

List<CellData> columnsRow = new ArrayList<>();
columnsRow.add(new CellData().setFormattedValue("a"));
columnsRow.add(new CellData().setFormattedValue("title with space"));
columnsRow.add(new CellData());

List<CellData> subColumnsRow = new ArrayList<>();
subColumnsRow.add(new CellData().setFormattedValue("no header value"));
subColumnsRow.add(new CellData().setFormattedValue("9titleWithFirstNumber"));
subColumnsRow.add(new CellData().setFormattedValue("d"));

List<CellData> dataRow = new ArrayList<>();
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa")));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d)));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true)));

List<GridRange> columnMerges = new ArrayList<>();
columnMerges.add(new GridRange().setStartRowIndex(0).setEndRowIndex(1).setStartColumnIndex(1).setEndColumnIndex(3));

FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP);

int lastDataColumn = 3;

LinkedHashMap<Integer, ColumnComplexSchemaInfo> columns =
(LinkedHashMap<Integer, ColumnComplexSchemaInfo>) processColumnsMethod.invoke(config, columnsRow,
subColumnsRow, dataRow,
columnMerges, lastDataColumn,
collector);

Assert.assertEquals(2, columns.size());
Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1)));

// check simple column
Assert.assertEquals("a", columns.get(0).getHeaderTitle());
Assert.assertTrue(columns.get(0).getSubColumns().isEmpty());

// check complex columns, top header should have column name as name
Assert.assertEquals("B", columns.get(1).getHeaderTitle());
List<ColumnComplexSchemaInfo> subColumns = columns.get(1).getSubColumns();
Assert.assertFalse(subColumns.isEmpty());

// check sub-columns
Assert.assertEquals(2, subColumns.size());
Assert.assertEquals("B", subColumns.get(0).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Assert.assertEquals("d", subColumns.get(1).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
}
}
35 changes: 34 additions & 1 deletion widgets/GoogleSheets-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,44 @@
"default": "100",
"min": "1"
}
},
{
"widget-type": "toggle",
"label": "Enable Cleansing Column Names",
"name": "columnNameCleansingEnabled",
"widget-attributes": {
"on": {
"value": "true",
"label": "Yes"
},
"off": {
"value": "false",
"label": "No"
},
"default": "true"
}
}
]
}
],
"outputs": [],
"outputs": [
{
"name": "schema",
"label": "schema",
"widget-type": "schema",
"widget-attributes": {
"schema-types": [
"boolean",
"long",
"double",
"bytes",
"string",
"array"
],
"schema-default-type": "string"
}
}
],
"filters": [
{
"name": "Select modification date range",
Expand Down