Skip to content

Commit

Permalink
[vpj] Revert using native Spark readers for reading from Avro and VSO…
Browse files Browse the repository at this point in the history
…N files (linkedin#1127)

Revert "[vpj] Use native Spark readers for reading from Avro and VSON files (linkedin#1036)"

This reverts commit 139d733.
  • Loading branch information
huangminchn authored Aug 20, 2024
1 parent 0ae9e6a commit ed851aa
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 1,803 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ ext.libraries = [
grpcServices: "io.grpc:grpc-services:${grpcVersion}",
grpcStub: "io.grpc:grpc-stub:${grpcVersion}",
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
hadoopHdfs: "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}",
helix: 'org.apache.helix:helix-core:1.1.0',
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
httpClient5: 'org.apache.httpcomponents.client5:httpclient5:5.3',
Expand Down
7 changes: 0 additions & 7 deletions clients/venice-push-job/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ dependencies {
exclude group: 'javax.servlet'
}

implementation (libraries.hadoopHdfs) {
// Exclude transitive dependency
exclude group: 'org.apache.avro'
exclude group: 'javax.servlet'
exclude group: 'com.fasterxml.jackson.core'
}

implementation (libraries.apacheSparkAvro) {
// Spark 3.1 depends on Avro 1.8.2 - which uses avro-mapred with the hadoop2 classifier. Starting from Avro 1.9
// onwards, avro-mapred is no longer published with a hadoop2 classifier, but Gradle still looks for one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ private VenicePushJobConstants() {
public static final boolean DEFAULT_EXTENDED_SCHEMA_VALIDITY_CHECK_ENABLED = true;
public static final String UPDATE_SCHEMA_STRING_PROP = "update.schema";

// This is a temporary config used to rollout the native input format for Spark. This will be removed soon
public static final String SPARK_NATIVE_INPUT_FORMAT_ENABLED = "spark.native.input.format.enabled";

// Vson input configs
// Vson files store key/value schema on file header. key / value fields are optional
// and should be specified only when key / value schema is the partial of the files.
Expand Down Expand Up @@ -225,7 +222,6 @@ private VenicePushJobConstants() {
* ignore hdfs files with prefix "_" and "."
*/
public static final PathFilter PATH_FILTER = p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
public static final String GLOB_FILTER_PATTERN = "[^_.]*";

// Configs to control temp paths and their permissions
public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,20 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.FILE_KEY_SCHEMA;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.FILE_VALUE_SCHEMA;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.GENERATE_PARTIAL_UPDATE_RECORD_FROM_INPUT;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.GLOB_FILTER_PATTERN;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.INPUT_PATH_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SCHEMA_STRING_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SPARK_NATIVE_INPUT_FORMAT_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.UPDATE_SCHEMA_STRING_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.VSON_PUSH;
import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.hadoop.input.recordreader.avro.VeniceAvroRecordReader;
import com.linkedin.venice.hadoop.input.recordreader.vson.VeniceVsonRecordReader;
import com.linkedin.venice.hadoop.spark.input.hdfs.VeniceHdfsSource;
import com.linkedin.venice.hadoop.spark.utils.RowToAvroConverter;
import com.linkedin.venice.utils.VeniceProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;


/**
Expand All @@ -46,19 +29,6 @@ protected Dataset<Row> getUserInputDataFrame() {
SparkSession sparkSession = getSparkSession();
PushJobSetting pushJobSetting = getPushJobSetting();

VeniceProperties jobProps = getJobProperties();
boolean useNativeInputFormat = jobProps.getBoolean(SPARK_NATIVE_INPUT_FORMAT_ENABLED, false);

if (!useNativeInputFormat) {
return getDataFrameFromCustomInputFormat(sparkSession, pushJobSetting);
} else if (pushJobSetting.isAvro) {
return getAvroDataFrame(sparkSession, pushJobSetting);
} else {
return getVsonDataFrame(sparkSession, pushJobSetting);
}
}

private Dataset<Row> getDataFrameFromCustomInputFormat(SparkSession sparkSession, PushJobSetting pushJobSetting) {
DataFrameReader dataFrameReader = sparkSession.read();
dataFrameReader.format(VeniceHdfsSource.class.getCanonicalName());
setInputConf(sparkSession, dataFrameReader, INPUT_PATH_PROP, new Path(pushJobSetting.inputURI).toString());
Expand All @@ -85,53 +55,4 @@ private Dataset<Row> getDataFrameFromCustomInputFormat(SparkSession sparkSession
}
return dataFrameReader.load();
}

private Dataset<Row> getAvroDataFrame(SparkSession sparkSession, PushJobSetting pushJobSetting) {
Dataset<Row> df =
sparkSession.read().format("avro").option("pathGlobFilter", GLOB_FILTER_PATTERN).load(pushJobSetting.inputURI);

// Transforming the input data format
df = df.map((MapFunction<Row, Row>) (record) -> {
Schema updateSchema = null;
if (pushJobSetting.generatePartialUpdateRecordFromInput) {
updateSchema = AvroCompatibilityHelper.parse(pushJobSetting.valueSchemaString);
}

GenericRecord rowRecord = RowToAvroConverter.convert(record, pushJobSetting.inputDataSchema);
VeniceAvroRecordReader recordReader = new VeniceAvroRecordReader(
pushJobSetting.inputDataSchema,
pushJobSetting.keyField,
pushJobSetting.valueField,
pushJobSetting.etlValueSchemaTransformation,
updateSchema);

AvroWrapper<IndexedRecord> recordAvroWrapper = new AvroWrapper<>(rowRecord);
final byte[] inputKeyBytes = recordReader.getKeyBytes(recordAvroWrapper, null);
final byte[] inputValueBytes = recordReader.getValueBytes(recordAvroWrapper, null);

return new GenericRowWithSchema(new Object[] { inputKeyBytes, inputValueBytes }, DEFAULT_SCHEMA);
}, RowEncoder.apply(DEFAULT_SCHEMA));

return df;
}

@Deprecated
private Dataset<Row> getVsonDataFrame(SparkSession sparkSession, PushJobSetting pushJobSetting) {
JavaRDD<Row> rdd = sparkSession.sparkContext()
.sequenceFile(pushJobSetting.inputURI, BytesWritable.class, BytesWritable.class)
.toJavaRDD()
.map(record -> {
VeniceVsonRecordReader recordReader = new VeniceVsonRecordReader(
pushJobSetting.vsonInputKeySchemaString,
pushJobSetting.vsonInputValueSchemaString,
pushJobSetting.keyField,
pushJobSetting.valueField);

final byte[] inputKeyBytes = recordReader.getKeyBytes(record._1, record._2);
final byte[] inputValueBytes = recordReader.getValueBytes(record._1, record._2);

return new GenericRowWithSchema(new Object[] { inputKeyBytes, inputValueBytes }, DEFAULT_SCHEMA);
});
return sparkSession.createDataFrame(rdd, DEFAULT_SCHEMA);
}
}
Loading

0 comments on commit ed851aa

Please sign in to comment.