diff --git a/docs/GoogleSheets-batchsource.md b/docs/GoogleSheets-batchsource.md index fe10937..617d25c 100644 --- a/docs/GoogleSheets-batchsource.md +++ b/docs/GoogleSheets-batchsource.md @@ -150,6 +150,9 @@ Names Row contains less number of columns. **Read Buffer Size:** Number of rows the source reads with a single API request. Default value is 100. +**Enable Cleansing Column Names:** Toggle that specifies whether to cleanse column names containing special characters. +Special characters will be replaced by underscores. + ### Steps to Generate OAuth2 Credentials 1. Create credentials for the Client ID and Client Secret properties [here](https://console.cloud.google.com/apis/credentials). 2. On the Create OAuth client ID page, under Authorized redirect URIs, specify a URI of `http://localhost:8080`. diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java index 91f12cd..3752f07 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java @@ -27,6 +27,7 @@ import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.LineageRecorder; @@ -51,11 +52,13 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); config.validate(failureCollector); failureCollector.getOrThrowException(); - pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(failureCollector)); + Schema configuredSchema = config.getSchema(failureCollector); + stageConfigurer.setOutputSchema(configuredSchema); } @Override diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java index 8fb53f1..03939c8 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java @@ -98,7 +98,9 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { public static final String CONFIGURATION_PARSE_PROPERTY_NAME = "properties"; private static final Logger LOG = LoggerFactory.getLogger(GoogleSheetsSourceConfig.class); private static final Pattern CELL_ADDRESS = Pattern.compile("^([A-Z]+)([0-9]+)$"); + private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+"); private static final Pattern COLUMN_NAME = Pattern.compile("^[A-Za-z_][A-Za-z0-9_-]*$"); + public static final String CLEANSE_COLUMN_NAMES = "columnNameCleansingEnabled"; private static LinkedHashMap dataSchemaInfo = new LinkedHashMap<>(); @Name(SHEETS_TO_PULL) @@ -118,7 +120,7 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { @Name(NAME_SCHEMA) @Description("The schema of the table to read.") @Macro - private transient Schema schema = null; + private String schema; @Name(FORMATTING) @Description("Output format for numeric sheet cells. " + @@ -234,6 +236,13 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { @Macro private String sheetFieldName; + @Nullable + @Name(CLEANSE_COLUMN_NAMES) + @Description("Toggle that specifies whether to cleanse column names containing special characters. " + + "Special characters will be replaced by underscores.") + @Macro + private Boolean columnNameCleansingEnabled; + public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIdentifiers, String formatting, Boolean skipEmptyData, String columnNamesSelection, @Nullable Integer customColumnNamesRow, String metadataFieldName, @@ -242,7 +251,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde @Nullable String lastDataColumn, @Nullable String lastDataRow, @Nullable String metadataCells, @Nullable Integer readBufferSize, @Nullable Boolean addNameFields, @Nullable String spreadsheetFieldName, - @Nullable String sheetFieldName) { + @Nullable String sheetFieldName, @Nullable String schema) { super(referenceName); this.sheetsIdentifiers = sheetsIdentifiers; @@ -262,6 +271,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde this.addNameFields = addNameFields; this.spreadsheetFieldName = spreadsheetFieldName; this.sheetFieldName = sheetFieldName; + this.schema = schema; } @@ -281,9 +291,15 @@ public Schema getSchema(FailureCollector collector) { "Perhaps no validation step was executed before schema generation.") .withConfigProperty(SCHEMA); } - schema = SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values())); + return SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values())); } - return schema; + try { + return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema); + } catch (IOException e) { + collector.addFailure("Invalid schema: " + e.getMessage(), + null).withConfigProperty(NAME_SCHEMA); + } + throw collector.getOrThrowException(); } private boolean shouldGetSchema() { @@ -593,7 +609,7 @@ private LinkedHashMap processColumns(List columnHeaders = new LinkedHashMap<>(); - + final Map seenFieldNames = new HashMap<>(); List headerTitles = new ArrayList<>(); for (int i = 0; i < Math.min(columnsRow.size(), lastDataColumn); i++) { CellData columnHeaderCell = columnsRow.get(i); @@ -609,7 +625,7 @@ private LinkedHashMap processColumns(List processColumns(List processSubHeaders(int startIndex, int length, List subColumnsRow, List dataRow, FailureCollector collector) { List subHeaders = new ArrayList<>(); + final Map seenFieldNames = new HashMap<>(); List titles = new ArrayList<>(); for (int i = startIndex; i < startIndex + length; i++) { String subHeaderTitle; @@ -642,7 +659,7 @@ private List processSubHeaders(int startIndex, int leng if (StringUtils.isEmpty(subHeaderTitle)) { subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1); } - subHeaderTitle = checkTitleFormat(subHeaderTitle, i); + subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames, i); } else { subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1); } @@ -661,11 +678,49 @@ private List processSubHeaders(int startIndex, int leng return subHeaders; } - private String checkTitleFormat(String title, int columnIndex) { + private String checkTitleFormat(String title, Map seenFieldNames, int columnIndex) { + if (getColumnNameCleansingEnabled()) { + return applyFileTitleConvention(title, seenFieldNames); + } + return applySheetTitleConvention(title, columnIndex); + } + + private String applyFileTitleConvention(String title, Map seenFieldNames) { + final String replacementChar = "_"; + + StringBuilder cleanFieldNameBuilder = new StringBuilder(); + + // Remove any spaces at the end of the strings + title = title.trim(); + + // If it's an empty string replace it with BLANK + if (title.isEmpty()) { + cleanFieldNameBuilder.append("BLANK"); + } else if (Character.isDigit(title.charAt(0))) { + // Prepend a col_ if the first character is a number + cleanFieldNameBuilder.append("col_"); + } + + // Replace all invalid characters with the replacement char + cleanFieldNameBuilder.append(NOT_VALID_PATTERN.matcher(title).replaceAll(replacementChar)); + + String cleanFieldName = cleanFieldNameBuilder.toString(); + String lowerCaseCleanFieldName = cleanFieldName.toLowerCase(); + int count = seenFieldNames.getOrDefault(lowerCaseCleanFieldName, 0) + 1; + seenFieldNames.put(lowerCaseCleanFieldName, count); + // In case column already exists in seenFieldNames map, append the count with column name. + if (count > 1) { + cleanFieldNameBuilder.append(replacementChar).append(count); + } + return cleanFieldNameBuilder.toString(); + } + + private String applySheetTitleConvention(String title, int columnIndex) { if (!COLUMN_NAME.matcher(title).matches()) { String defaultColumnName = ColumnAddressConverter.getColumnName(columnIndex + 1); LOG.warn(String.format("Original column name '%s' doesn't satisfy column name requirements '%s', " + - "the default column name '%s' will be used.", title, COLUMN_NAME.pattern(), defaultColumnName)); + "the default column name '%s' will be used.", title, COLUMN_NAME.pattern(), + defaultColumnName)); return defaultColumnName; } return title; @@ -929,6 +984,11 @@ public boolean getAutoDetectRowsAndColumns() { return Boolean.TRUE.equals(autoDetectRowsAndColumns); } + @Nullable + public boolean getColumnNameCleansingEnabled() { + return Boolean.TRUE.equals(columnNameCleansingEnabled); + } + public Integer getLastDataColumn() { return lastDataColumn == null ? 0 : Integer.parseInt(lastDataColumn); } @@ -1030,8 +1090,8 @@ public void setSheetsIdentifiers(String sheetsIdentifiers) { this.sheetsIdentifiers = sheetsIdentifiers; } - public void setSchema(String schema) throws IOException { - this.schema = Schema.parseJson(schema); + public void setSchema(String schema) { + this.schema = schema; } public void setFormatting(String formatting) { @@ -1122,6 +1182,10 @@ public void setEndDate(String endDate) { this.endDate = endDate; } + public void setColumnNameCleansingEnabled(@Nullable Boolean columnNameCleansingEnabled) { + this.columnNameCleansingEnabled = columnNameCleansingEnabled; + } + public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOException { GoogleSheetsSourceConfig googleSheetsSourceConfig = GoogleSheetsSourceConfig @@ -1305,6 +1369,11 @@ public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOExcept properties.get(GoogleSheetsSourceConfig.AUTO_DETECT_ROWS_AND_COLUMNS).getAsBoolean()); } + if (properties.has(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES)) { + googleSheetsSourceConfig.setColumnNameCleansingEnabled( + properties.get(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES).getAsBoolean()); + } + return googleSheetsSourceConfig; } } diff --git a/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java b/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java index 55969a9..65d8b63 100644 --- a/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java +++ b/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java @@ -321,6 +321,7 @@ public void testProcessColumnsWithLastDataColumnLessThanColumnsRowSize() @Test public void testProcessColumnsInvalidTitles() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + config.setColumnNameCleansingEnabled(true); Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class, List.class, List.class, List.class, int.class, FailureCollector.class); processColumnsMethod.setAccessible(true); @@ -359,13 +360,13 @@ public void testProcessColumnsInvalidTitles() Assert.assertTrue(columns.get(0).getSubColumns().isEmpty()); // check complex columns, top header should have column name as name - Assert.assertEquals("B", columns.get(1).getHeaderTitle()); + Assert.assertEquals("title_with_space", columns.get(1).getHeaderTitle()); List subColumns = columns.get(1).getSubColumns(); Assert.assertFalse(subColumns.isEmpty()); // check sub-columns Assert.assertEquals(2, subColumns.size()); - Assert.assertEquals("B", subColumns.get(0).getHeaderTitle()); + Assert.assertEquals("col_9titleWithFirstNumber", subColumns.get(0).getHeaderTitle()); Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty()); Assert.assertEquals("d", subColumns.get(1).getHeaderTitle()); Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty()); @@ -376,4 +377,98 @@ private void setFieldValue(String fieldName, Object fieldValue) throws NoSuchFie metadataKeyCellsField.setAccessible(true); metadataKeyCellsField.set(config, fieldValue); } + + @Test + public void testProcessColumnsSameCaseSensitiveTitles() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + config.setColumnNameCleansingEnabled(true); + Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class, + List.class, List.class, List.class, int.class, + FailureCollector.class); + processColumnsMethod.setAccessible(true); + + List columnsRow = new ArrayList<>(); + columnsRow.add(new CellData().setFormattedValue("title with space")); + columnsRow.add(new CellData().setFormattedValue("Title with space")); + columnsRow.add(new CellData().setFormattedValue("Title%with%space")); + + List dataRow = new ArrayList<>(); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa"))); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d))); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true))); + + List columnMerges = new ArrayList<>(); + + FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP); + + int lastDataColumn = 3; + + LinkedHashMap columns = + (LinkedHashMap) processColumnsMethod.invoke(config, columnsRow, + null, dataRow, columnMerges, + lastDataColumn, collector); + + Assert.assertEquals(3, columns.size()); + Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1, 2))); + + Assert.assertEquals("title_with_space", columns.get(0).getHeaderTitle()); + Assert.assertEquals("Title_with_space_2", columns.get(1).getHeaderTitle()); + Assert.assertEquals("Title_with_space_3", columns.get(2).getHeaderTitle()); + } + + @Test + public void testProcessColumnsInvalidTitlesOldSchema() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class, + List.class, List.class, List.class, int.class, + FailureCollector.class); + processColumnsMethod.setAccessible(true); + + List columnsRow = new ArrayList<>(); + columnsRow.add(new CellData().setFormattedValue("a")); + columnsRow.add(new CellData().setFormattedValue("title with space")); + columnsRow.add(new CellData()); + + List subColumnsRow = new ArrayList<>(); + subColumnsRow.add(new CellData().setFormattedValue("no header value")); + subColumnsRow.add(new CellData().setFormattedValue("9titleWithFirstNumber")); + subColumnsRow.add(new CellData().setFormattedValue("d")); + + List dataRow = new ArrayList<>(); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa"))); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d))); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true))); + + List columnMerges = new ArrayList<>(); + columnMerges.add(new GridRange().setStartRowIndex(0).setEndRowIndex(1).setStartColumnIndex(1).setEndColumnIndex(3)); + + FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP); + + int lastDataColumn = 3; + + LinkedHashMap columns = + (LinkedHashMap) processColumnsMethod.invoke(config, columnsRow, + subColumnsRow, dataRow, + columnMerges, lastDataColumn, + collector); + + Assert.assertEquals(2, columns.size()); + Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1))); + + // check simple column + Assert.assertEquals("a", columns.get(0).getHeaderTitle()); + Assert.assertTrue(columns.get(0).getSubColumns().isEmpty()); + + // check complex columns, top header should have column name as name + Assert.assertEquals("B", columns.get(1).getHeaderTitle()); + List subColumns = columns.get(1).getSubColumns(); + Assert.assertFalse(subColumns.isEmpty()); + + // check sub-columns + Assert.assertEquals(2, subColumns.size()); + Assert.assertEquals("B", subColumns.get(0).getHeaderTitle()); + Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty()); + Assert.assertEquals("d", subColumns.get(1).getHeaderTitle()); + Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty()); + } } diff --git a/widgets/GoogleSheets-batchsource.json b/widgets/GoogleSheets-batchsource.json index 5d87a1e..f4137d2 100644 --- a/widgets/GoogleSheets-batchsource.json +++ b/widgets/GoogleSheets-batchsource.json @@ -429,11 +429,44 @@ "default": "100", "min": "1" } + }, + { + "widget-type": "toggle", + "label": "Enable Cleansing Column Names", + "name": "columnNameCleansingEnabled", + "widget-attributes": { + "on": { + "value": "true", + "label": "Yes" + }, + "off": { + "value": "false", + "label": "No" + }, + "default": "true" + } } ] } ], - "outputs": [], + "outputs": [ + { + "name": "schema", + "label": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "long", + "double", + "bytes", + "string", + "array" + ], + "schema-default-type": "string" + } + } + ], "filters": [ { "name": "Select modification date range",