Skip to content

Commit

Permalink
Merge pull request #55 from data-integrations/cherrypick-schema-colum…
Browse files Browse the repository at this point in the history
…n-fix

[🍒][PLUGIN-1785] Column name cleansing done as per other file plugins and made schema non transient.
  • Loading branch information
vikasrathee-cs authored May 30, 2024
2 parents 4959840 + a6c36fd commit 14d4263
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 15 deletions.
3 changes: 3 additions & 0 deletions docs/GoogleSheets-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ Names Row contains less number of columns.

**Read Buffer Size:** Number of rows the source reads with a single API request. Default value is 100.

**Enable Cleansing Column Names:** Toggle that specifies whether to cleanse column names containing special characters.
Special characters will be replaced by underscores.

### Steps to Generate OAuth2 Credentials
1. Create credentials for the Client ID and Client Secret properties [here](https://console.cloud.google.com/apis/credentials).
2. On the Create OAuth client ID page, under Authorized redirect URIs, specify a URI of `http://localhost:8080`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
Expand All @@ -51,11 +52,13 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
config.validate(failureCollector);
failureCollector.getOrThrowException();

pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(failureCollector));
Schema configuredSchema = config.getSchema(failureCollector);
stageConfigurer.setOutputSchema(configuredSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
public static final String CONFIGURATION_PARSE_PROPERTY_NAME = "properties";
private static final Logger LOG = LoggerFactory.getLogger(GoogleSheetsSourceConfig.class);
private static final Pattern CELL_ADDRESS = Pattern.compile("^([A-Z]+)([0-9]+)$");
private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+");
private static final Pattern COLUMN_NAME = Pattern.compile("^[A-Za-z_][A-Za-z0-9_-]*$");
public static final String CLEANSE_COLUMN_NAMES = "columnNameCleansingEnabled";
private static LinkedHashMap<Integer, ColumnComplexSchemaInfo> dataSchemaInfo = new LinkedHashMap<>();

@Name(SHEETS_TO_PULL)
Expand All @@ -118,7 +120,7 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
@Name(NAME_SCHEMA)
@Description("The schema of the table to read.")
@Macro
private transient Schema schema = null;
private String schema;

@Name(FORMATTING)
@Description("Output format for numeric sheet cells. " +
Expand Down Expand Up @@ -234,6 +236,13 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
@Macro
private String sheetFieldName;

@Nullable
@Name(CLEANSE_COLUMN_NAMES)
@Description("Toggle that specifies whether to cleanse column names containing special characters. " +
"Special characters will be replaced by underscores.")
@Macro
private Boolean columnNameCleansingEnabled;

public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIdentifiers, String formatting,
Boolean skipEmptyData, String columnNamesSelection,
@Nullable Integer customColumnNamesRow, String metadataFieldName,
Expand All @@ -242,7 +251,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde
@Nullable String lastDataColumn, @Nullable String lastDataRow,
@Nullable String metadataCells, @Nullable Integer readBufferSize,
@Nullable Boolean addNameFields, @Nullable String spreadsheetFieldName,
@Nullable String sheetFieldName) {
@Nullable String sheetFieldName, @Nullable String schema) {

super(referenceName);
this.sheetsIdentifiers = sheetsIdentifiers;
Expand All @@ -262,6 +271,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde
this.addNameFields = addNameFields;
this.spreadsheetFieldName = spreadsheetFieldName;
this.sheetFieldName = sheetFieldName;
this.schema = schema;
}


Expand All @@ -281,9 +291,15 @@ public Schema getSchema(FailureCollector collector) {
"Perhaps no validation step was executed before schema generation.")
.withConfigProperty(SCHEMA);
}
schema = SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values()));
return SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values()));
}
return schema;
try {
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
} catch (IOException e) {
collector.addFailure("Invalid schema: " + e.getMessage(),
null).withConfigProperty(NAME_SCHEMA);
}
throw collector.getOrThrowException();
}

private boolean shouldGetSchema() {
Expand Down Expand Up @@ -593,7 +609,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
int lastDataColumn,
FailureCollector collector) {
LinkedHashMap<Integer, ColumnComplexSchemaInfo> columnHeaders = new LinkedHashMap<>();

final Map<String, Integer> seenFieldNames = new HashMap<>();
List<String> headerTitles = new ArrayList<>();
for (int i = 0; i < Math.min(columnsRow.size(), lastDataColumn); i++) {
CellData columnHeaderCell = columnsRow.get(i);
Expand All @@ -609,7 +625,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
}
String title = columnHeaderCell.getFormattedValue();
if (StringUtils.isNotEmpty(title)) {
title = checkTitleFormat(title, i);
title = checkTitleFormat(title, seenFieldNames, i);

// for merge we should analyse sub headers for data schemas
if (isMergeHead) {
Expand All @@ -634,6 +650,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int length, List<CellData> subColumnsRow,
List<CellData> dataRow, FailureCollector collector) {
List<ColumnComplexSchemaInfo> subHeaders = new ArrayList<>();
final Map<String, Integer> seenFieldNames = new HashMap<>();
List<String> titles = new ArrayList<>();
for (int i = startIndex; i < startIndex + length; i++) {
String subHeaderTitle;
Expand All @@ -642,7 +659,7 @@ private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int leng
if (StringUtils.isEmpty(subHeaderTitle)) {
subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1);
}
subHeaderTitle = checkTitleFormat(subHeaderTitle, i);
subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames, i);
} else {
subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1);
}
Expand All @@ -661,11 +678,49 @@ private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int leng
return subHeaders;
}

private String checkTitleFormat(String title, int columnIndex) {
private String checkTitleFormat(String title, Map<String, Integer> seenFieldNames, int columnIndex) {
if (getColumnNameCleansingEnabled()) {
return applyFileTitleConvention(title, seenFieldNames);
}
return applySheetTitleConvention(title, columnIndex);
}

private String applyFileTitleConvention(String title, Map<String, Integer> seenFieldNames) {
final String replacementChar = "_";

StringBuilder cleanFieldNameBuilder = new StringBuilder();

// Remove any spaces at the end of the strings
title = title.trim();

// If it's an empty string replace it with BLANK
if (title.isEmpty()) {
cleanFieldNameBuilder.append("BLANK");
} else if (Character.isDigit(title.charAt(0))) {
// Prepend a col_ if the first character is a number
cleanFieldNameBuilder.append("col_");
}

// Replace all invalid characters with the replacement char
cleanFieldNameBuilder.append(NOT_VALID_PATTERN.matcher(title).replaceAll(replacementChar));

String cleanFieldName = cleanFieldNameBuilder.toString();
String lowerCaseCleanFieldName = cleanFieldName.toLowerCase();
int count = seenFieldNames.getOrDefault(lowerCaseCleanFieldName, 0) + 1;
seenFieldNames.put(lowerCaseCleanFieldName, count);
// In case column already exists in seenFieldNames map, append the count with column name.
if (count > 1) {
cleanFieldNameBuilder.append(replacementChar).append(count);
}
return cleanFieldNameBuilder.toString();
}

private String applySheetTitleConvention(String title, int columnIndex) {
if (!COLUMN_NAME.matcher(title).matches()) {
String defaultColumnName = ColumnAddressConverter.getColumnName(columnIndex + 1);
LOG.warn(String.format("Original column name '%s' doesn't satisfy column name requirements '%s', " +
"the default column name '%s' will be used.", title, COLUMN_NAME.pattern(), defaultColumnName));
"the default column name '%s' will be used.", title, COLUMN_NAME.pattern(),
defaultColumnName));
return defaultColumnName;
}
return title;
Expand Down Expand Up @@ -929,6 +984,11 @@ public boolean getAutoDetectRowsAndColumns() {
return Boolean.TRUE.equals(autoDetectRowsAndColumns);
}

@Nullable
public boolean getColumnNameCleansingEnabled() {
return Boolean.TRUE.equals(columnNameCleansingEnabled);
}

public Integer getLastDataColumn() {
return lastDataColumn == null ? 0 : Integer.parseInt(lastDataColumn);
}
Expand Down Expand Up @@ -1030,8 +1090,8 @@ public void setSheetsIdentifiers(String sheetsIdentifiers) {
this.sheetsIdentifiers = sheetsIdentifiers;
}

public void setSchema(String schema) throws IOException {
this.schema = Schema.parseJson(schema);
public void setSchema(String schema) {
this.schema = schema;
}

public void setFormatting(String formatting) {
Expand Down Expand Up @@ -1122,6 +1182,10 @@ public void setEndDate(String endDate) {
this.endDate = endDate;
}

public void setColumnNameCleansingEnabled(@Nullable Boolean columnNameCleansingEnabled) {
this.columnNameCleansingEnabled = columnNameCleansingEnabled;
}

public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOException {

GoogleSheetsSourceConfig googleSheetsSourceConfig = GoogleSheetsSourceConfig
Expand Down Expand Up @@ -1305,6 +1369,11 @@ public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOExcept
properties.get(GoogleSheetsSourceConfig.AUTO_DETECT_ROWS_AND_COLUMNS).getAsBoolean());
}

if (properties.has(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES)) {
googleSheetsSourceConfig.setColumnNameCleansingEnabled(
properties.get(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES).getAsBoolean());
}

return googleSheetsSourceConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ public void testProcessColumnsWithLastDataColumnLessThanColumnsRowSize()
@Test
public void testProcessColumnsInvalidTitles()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
config.setColumnNameCleansingEnabled(true);
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class, FailureCollector.class);
processColumnsMethod.setAccessible(true);
Expand Down Expand Up @@ -359,13 +360,13 @@ public void testProcessColumnsInvalidTitles()
Assert.assertTrue(columns.get(0).getSubColumns().isEmpty());

// check complex columns, top header should have column name as name
Assert.assertEquals("B", columns.get(1).getHeaderTitle());
Assert.assertEquals("title_with_space", columns.get(1).getHeaderTitle());
List<ColumnComplexSchemaInfo> subColumns = columns.get(1).getSubColumns();
Assert.assertFalse(subColumns.isEmpty());

// check sub-columns
Assert.assertEquals(2, subColumns.size());
Assert.assertEquals("B", subColumns.get(0).getHeaderTitle());
Assert.assertEquals("col_9titleWithFirstNumber", subColumns.get(0).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Assert.assertEquals("d", subColumns.get(1).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Expand All @@ -376,4 +377,98 @@ private void setFieldValue(String fieldName, Object fieldValue) throws NoSuchFie
metadataKeyCellsField.setAccessible(true);
metadataKeyCellsField.set(config, fieldValue);
}

@Test
public void testProcessColumnsSameCaseSensitiveTitles()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
config.setColumnNameCleansingEnabled(true);
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class,
FailureCollector.class);
processColumnsMethod.setAccessible(true);

List<CellData> columnsRow = new ArrayList<>();
columnsRow.add(new CellData().setFormattedValue("title with space"));
columnsRow.add(new CellData().setFormattedValue("Title with space"));
columnsRow.add(new CellData().setFormattedValue("Title%with%space"));

List<CellData> dataRow = new ArrayList<>();
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa")));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d)));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true)));

List<GridRange> columnMerges = new ArrayList<>();

FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP);

int lastDataColumn = 3;

LinkedHashMap<Integer, ColumnComplexSchemaInfo> columns =
(LinkedHashMap<Integer, ColumnComplexSchemaInfo>) processColumnsMethod.invoke(config, columnsRow,
null, dataRow, columnMerges,
lastDataColumn, collector);

Assert.assertEquals(3, columns.size());
Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1, 2)));

Assert.assertEquals("title_with_space", columns.get(0).getHeaderTitle());
Assert.assertEquals("Title_with_space_2", columns.get(1).getHeaderTitle());
Assert.assertEquals("Title_with_space_3", columns.get(2).getHeaderTitle());
}

@Test
public void testProcessColumnsInvalidTitlesOldSchema()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class,
FailureCollector.class);
processColumnsMethod.setAccessible(true);

List<CellData> columnsRow = new ArrayList<>();
columnsRow.add(new CellData().setFormattedValue("a"));
columnsRow.add(new CellData().setFormattedValue("title with space"));
columnsRow.add(new CellData());

List<CellData> subColumnsRow = new ArrayList<>();
subColumnsRow.add(new CellData().setFormattedValue("no header value"));
subColumnsRow.add(new CellData().setFormattedValue("9titleWithFirstNumber"));
subColumnsRow.add(new CellData().setFormattedValue("d"));

List<CellData> dataRow = new ArrayList<>();
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa")));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d)));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true)));

List<GridRange> columnMerges = new ArrayList<>();
columnMerges.add(new GridRange().setStartRowIndex(0).setEndRowIndex(1).setStartColumnIndex(1).setEndColumnIndex(3));

FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP);

int lastDataColumn = 3;

LinkedHashMap<Integer, ColumnComplexSchemaInfo> columns =
(LinkedHashMap<Integer, ColumnComplexSchemaInfo>) processColumnsMethod.invoke(config, columnsRow,
subColumnsRow, dataRow,
columnMerges, lastDataColumn,
collector);

Assert.assertEquals(2, columns.size());
Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1)));

// check simple column
Assert.assertEquals("a", columns.get(0).getHeaderTitle());
Assert.assertTrue(columns.get(0).getSubColumns().isEmpty());

// check complex columns, top header should have column name as name
Assert.assertEquals("B", columns.get(1).getHeaderTitle());
List<ColumnComplexSchemaInfo> subColumns = columns.get(1).getSubColumns();
Assert.assertFalse(subColumns.isEmpty());

// check sub-columns
Assert.assertEquals(2, subColumns.size());
Assert.assertEquals("B", subColumns.get(0).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Assert.assertEquals("d", subColumns.get(1).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
}
}
Loading

0 comments on commit 14d4263

Please sign in to comment.