diff --git a/format-common/src/main/java/io/cdap/plugin/format/input/AbstractEmptyInputFormat.java b/format-common/src/main/java/io/cdap/plugin/format/input/AbstractEmptyInputFormat.java new file mode 100644 index 000000000..4598256e3 --- /dev/null +++ b/format-common/src/main/java/io/cdap/plugin/format/input/AbstractEmptyInputFormat.java @@ -0,0 +1,74 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.input; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * An InputFormat that returns no data. + * @param the key class + * @param the value class + */ +public abstract class AbstractEmptyInputFormat extends InputFormat { + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException { + return Collections.emptyList(); + } + + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + return new RecordReader() { + @Override + public void initialize(InputSplit split, TaskAttemptContext context) { + // do nothing + } + + @Override + public boolean nextKeyValue() { + return false; + } + + @Override + public K getCurrentKey() { + return null; + } + + @Override + public V getCurrentValue() { + return null; + } + + @Override + public float getProgress() { + return 1.0F; + } + + @Override + public void close() { + // nothing to do + } + }; + } +} diff --git a/format-common/src/main/java/io/cdap/plugin/format/input/EmptyInputFormat.java b/format-common/src/main/java/io/cdap/plugin/format/input/EmptyInputFormat.java index 7f60a28be..712fe2b0f 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/input/EmptyInputFormat.java +++ b/format-common/src/main/java/io/cdap/plugin/format/input/EmptyInputFormat.java @@ -15,60 +15,12 @@ */ package io.cdap.plugin.format.input; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; /** * An InputFormat that returns no data. * @param the key class * @param the value class */ -public class EmptyInputFormat extends InputFormat { - - @Override - public List getSplits(JobContext context) throws IOException, InterruptedException { - return Collections.emptyList(); - } - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { - return new RecordReader() { - @Override - public void initialize(InputSplit split, TaskAttemptContext context) { - // do nothing - } - - @Override - public boolean nextKeyValue() { - return false; - } - - @Override - public K getCurrentKey() { - return null; - } - - @Override - public V getCurrentValue() { - return null; - } - - @Override - public float getProgress() { - return 1.0F; - } - - @Override - public void close() { - // nothing to do - } - }; - } +public class EmptyInputFormat extends AbstractEmptyInputFormat { + // no-op } diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java index dd11f6c09..bd198ba84 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java @@ -81,6 +81,15 @@ protected AbstractFileSource(T config) { this.config = config; } + /** + * Override this to provide the class name of the Empty InputFormat + * to use when the input path does not exist. + * If not overridden, ClassNotFound exception will be thrown when the input path does not exist. + */ + protected String getEmptyInputFormatClassName() { + return EmptyInputFormat.class.getName(); + } + @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); @@ -203,7 +212,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { String inputFormatClass; if (fileStatus == null) { if (config.shouldAllowEmptyInput()) { - inputFormatClass = EmptyInputFormat.class.getName(); + inputFormatClass = getEmptyInputFormatClassName(); } else { throw new IOException(String.format("Input path %s does not exist", path)); }