diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java index 91f12cd..3752f07 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java @@ -27,6 +27,7 @@ import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.LineageRecorder; @@ -51,11 +52,13 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); config.validate(failureCollector); failureCollector.getOrThrowException(); - pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(failureCollector)); + Schema configuredSchema = config.getSchema(failureCollector); + stageConfigurer.setOutputSchema(configuredSchema); } @Override diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java index 2e00632..03939c8 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java @@ -120,7 +120,7 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { @Name(NAME_SCHEMA) @Description("The schema of the table to read.") @Macro - private transient Schema schema = null; + private String schema; @Name(FORMATTING) @Description("Output format for numeric sheet cells. " + @@ -251,7 +251,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde @Nullable String lastDataColumn, @Nullable String lastDataRow, @Nullable String metadataCells, @Nullable Integer readBufferSize, @Nullable Boolean addNameFields, @Nullable String spreadsheetFieldName, - @Nullable String sheetFieldName) { + @Nullable String sheetFieldName, @Nullable String schema) { super(referenceName); this.sheetsIdentifiers = sheetsIdentifiers; @@ -271,6 +271,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde this.addNameFields = addNameFields; this.spreadsheetFieldName = spreadsheetFieldName; this.sheetFieldName = sheetFieldName; + this.schema = schema; } @@ -290,9 +291,15 @@ public Schema getSchema(FailureCollector collector) { "Perhaps no validation step was executed before schema generation.") .withConfigProperty(SCHEMA); } - schema = SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values())); + return SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values())); } - return schema; + try { + return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema); + } catch (IOException e) { + collector.addFailure("Invalid schema: " + e.getMessage(), + null).withConfigProperty(NAME_SCHEMA); + } + throw collector.getOrThrowException(); } private boolean shouldGetSchema() { @@ -1083,8 +1090,8 @@ public void setSheetsIdentifiers(String sheetsIdentifiers) { this.sheetsIdentifiers = sheetsIdentifiers; } - public void setSchema(String schema) throws IOException { - this.schema = Schema.parseJson(schema); + public void setSchema(String schema) { + this.schema = schema; } public void setFormatting(String formatting) { diff --git a/widgets/GoogleSheets-batchsource.json b/widgets/GoogleSheets-batchsource.json index 3893df1..f4137d2 100644 --- a/widgets/GoogleSheets-batchsource.json +++ b/widgets/GoogleSheets-batchsource.json @@ -449,7 +449,24 @@ ] } ], - "outputs": [], + "outputs": [ + { + "name": "schema", + "label": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "long", + "double", + "bytes", + "string", + "array" + ], + "schema-default-type": "string" + } + } + ], "filters": [ { "name": "Select modification date range",