Skip to content

Commit

Permalink
Added getEmptyInputFormatClassName
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Feb 12, 2024
1 parent 923e603 commit dbd5473
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.format.input;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* An InputFormat that returns no data.
* @param <K> the key class
* @param <V> the value class
*/
public abstract class AbstractEmptyInputFormat<K, V> extends InputFormat<K, V> {

@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
return Collections.emptyList();
}

@Override
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new RecordReader<K, V>() {
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// do nothing
}

@Override
public boolean nextKeyValue() {
return false;
}

@Override
public K getCurrentKey() {
return null;
}

@Override
public V getCurrentValue() {
return null;
}

@Override
public float getProgress() {
return 1.0F;
}

@Override
public void close() {
// nothing to do
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,60 +15,12 @@
*/

package io.cdap.plugin.format.input;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* An InputFormat that returns no data.
* @param <K> the key class
* @param <V> the value class
*/
public class EmptyInputFormat<K, V> extends InputFormat<K, V> {

@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
return Collections.emptyList();
}

@Override
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new RecordReader<K, V>() {
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// do nothing
}

@Override
public boolean nextKeyValue() {
return false;
}

@Override
public K getCurrentKey() {
return null;
}

@Override
public V getCurrentValue() {
return null;
}

@Override
public float getProgress() {
return 1.0F;
}

@Override
public void close() {
// nothing to do
}
};
}
public class EmptyInputFormat<K, V> extends AbstractEmptyInputFormat<K, V> {
// no-op
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ protected AbstractFileSource(T config) {
this.config = config;
}

/**
* Override this to provide the class name of the Empty InputFormat
* to use when the input path does not exist.
* If not overridden, ClassNotFound exception will be thrown when the input path does not exist.
*/
protected String getEmptyInputFormatClassName() {
return EmptyInputFormat.class.getName();
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
Expand Down Expand Up @@ -203,7 +212,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
String inputFormatClass;
if (fileStatus == null) {
if (config.shouldAllowEmptyInput()) {
inputFormatClass = EmptyInputFormat.class.getName();
inputFormatClass = getEmptyInputFormatClassName();
} else {
throw new IOException(String.format("Input path %s does not exist", path));
}
Expand Down

0 comments on commit dbd5473

Please sign in to comment.