Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1234] DO NOT MERGE use fg reader in cdc test #11401

Closed
wants to merge 100 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
283d7c3
add spark 3.3 reader
Apr 2, 2024
ef65428
add spark3.4
Apr 2, 2024
8168147
add spark 3.5
Apr 2, 2024
1a53f1e
add spark 3.2
Apr 2, 2024
97d9920
add spark 3.1
Apr 2, 2024
b9d7ce4
add spark 3.0
Apr 2, 2024
a20e9d4
add spark 2.4
Apr 2, 2024
abe7839
spark 3.3 use properties class
Apr 3, 2024
865526e
spark 3.2 add props class
Apr 3, 2024
bab974a
spark 3.4 add properties
Apr 4, 2024
0eb2185
add spark 3.5 properties
Apr 4, 2024
10a577f
add properties spark 3.1
Apr 4, 2024
3c7ecf1
add props spark 3.0
Apr 4, 2024
700013b
add properties spark 2.4
Apr 4, 2024
37f52eb
fix 3.0
Apr 4, 2024
b9c1592
refactor to get rid of properties, spark 3.1
Apr 4, 2024
e3957c5
remove props spark 3.0
Apr 4, 2024
7345f6b
use class model for spark 3.3
Apr 4, 2024
2942a6c
remove props spark 3.3
Apr 4, 2024
2012131
remove props spark 3.4
Apr 4, 2024
e40072e
remove props spark 3.5
Apr 4, 2024
5813cbf
remove props spark 2.4
Apr 4, 2024
0f00822
remove change
Apr 4, 2024
867593d
remove bad import
Apr 4, 2024
64965e6
add spark 3.3
Apr 2, 2024
6b0ca88
add spark 3.4
Apr 2, 2024
5a557e1
add spark 3.5
Apr 2, 2024
2f48bdd
add spark 3.2
Apr 2, 2024
9845f37
add spark 3.1
Apr 2, 2024
5e536fe
add spark 3.0
Apr 2, 2024
9dc9ade
add spark 2.4
Apr 2, 2024
b12c018
fix 2.4
Apr 4, 2024
749c651
integrate schema evolution into the fg reader
Apr 2, 2024
d59fd1b
finish rebase
Apr 4, 2024
b7a7c27
use augment hadoop conf
Apr 5, 2024
8319069
clone hadoop conf for log file read
Apr 5, 2024
28200fd
try copying conf in reader
Apr 5, 2024
8ca12f2
create a copy of the conf when reading
Apr 5, 2024
088f69e
make conf copy during read
Apr 5, 2024
c5d77d5
recordkey filters is reverse
Apr 9, 2024
3b8b4f1
fix position based merging
Apr 10, 2024
70cef40
fix mor
Apr 10, 2024
56125af
fix some tests
Apr 10, 2024
31eb84b
add validations
Apr 10, 2024
9c723d0
fixed broken test
Apr 10, 2024
f475aa9
fix set and use wrong var
Apr 10, 2024
1e4657a
only update and use reader state in the constructor
Apr 10, 2024
15acc2e
use data block schema to read untransformed record
Apr 10, 2024
8205971
add test
Apr 11, 2024
815b6fd
allow vectorized read and comment better
Apr 11, 2024
120226a
address review comments 3.5
Apr 11, 2024
dbdefad
rename spark 3.4
Apr 11, 2024
f950835
rename for spark3.3
Apr 11, 2024
75da5dd
rename for spark 3.2
Apr 11, 2024
e7e4b51
rename spark 3.1
Apr 11, 2024
81da1a7
rename spark 30
Apr 11, 2024
1c68439
rename for spark 2
Apr 11, 2024
f6c5beb
remove empty line
Apr 11, 2024
8f1ba6d
address hidden review comments
Apr 12, 2024
37bc97b
Merge branch 'create_spark_file_readers' into add_schema_evolution_to…
Apr 12, 2024
26c1df8
Merge branch 'add_schema_evolution_to_spark_file_readers' into add_sc…
Apr 12, 2024
ee7a0e3
finish merge
Apr 12, 2024
a73f955
Merge branch 'master' into add_schema_evolution_to_spark_file_readers
Apr 12, 2024
966e8c8
Merge branch 'add_schema_evolution_to_spark_file_readers' into add_sc…
Apr 12, 2024
c8f507b
add missing import
Apr 12, 2024
be77950
address comments and add changes to legacy 3.5
Apr 15, 2024
8943bb4
spark 3.4 update legacy
Apr 15, 2024
4c32421
make changes to spark 3.3 and restore legacy for 3.4 and 3.5
Apr 15, 2024
a08eacb
update spark 3.2
Apr 15, 2024
1edf6bf
update spark 3.1
Apr 15, 2024
eb58a1a
update spark 3.0
Apr 15, 2024
aed8113
Merge branch 'add_schema_evolution_to_spark_file_readers' into add_sc…
Apr 15, 2024
94171e2
Merge branch 'master' into add_schema_evolution_to_fg_reader
Apr 15, 2024
72e09f6
remove some unnecessary changes to make it easier to review
Apr 16, 2024
24be896
fix most review comments
Apr 18, 2024
d1fbbf6
address review comments
Apr 18, 2024
96a371f
Merge branch 'master' into add_schema_evolution_to_fg_reader
Apr 18, 2024
89078f3
use more complex projection
Apr 18, 2024
6df21d8
Merge branch 'master' into add_schema_evolution_to_fg_reader
jonvex Apr 24, 2024
95b6d62
Merge branch 'master' into add_schema_evolution_to_fg_reader
jonvex Apr 29, 2024
879e07c
add batch support
Apr 29, 2024
c98242b
Merge branch 'master' into add_schema_evolution_to_fg_reader
Apr 30, 2024
9d1ac2a
add comment and allow vector read when positional merge is used
Apr 30, 2024
9d0a471
Merge branch 'master' into add_schema_evolution_to_fg_reader
May 7, 2024
540d122
Merge branch 'master' into add_schema_evolution_to_fg_reader
May 14, 2024
f126475
Merge branch 'master' into add_schema_evolution_to_fg_reader
May 30, 2024
63737ca
don't unwrap copy, we need to original so that the hadoopconf gets mo…
May 30, 2024
475a1bc
fix another case where hadoopconf copy isolates config changes
May 30, 2024
d504e37
disable problem test
May 31, 2024
ec6fa62
disable vectorized read for mor and bootstrap
May 31, 2024
2284613
revert test change
Jun 3, 2024
72eb6de
Address self comments
codope Jun 4, 2024
5f257f0
allow batch for cdc
Jun 4, 2024
29c604c
add comments
Jun 4, 2024
4045388
add testing back/ add new testing
Jun 4, 2024
575b206
add spark test
Jun 4, 2024
36d0b15
fix build errors
Jun 4, 2024
11862a3
Merge branch 'master' into add_schema_evolution_to_fg_reader
Jun 5, 2024
e710020
make default value -1 for position column
Jun 5, 2024
a4f3d9a
replace use of parquet reader with fg reader in the cdc iterator
Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;

import java.util.Map;
import java.util.function.UnaryOperator;

import scala.Function1;

import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
Expand Down Expand Up @@ -137,8 +139,14 @@ private Object getFieldValueFromInternalRow(InternalRow row, Schema recordSchema
}

@Override
public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
UnsafeProjection projection = HoodieInternalRowUtils.generateUnsafeProjectionAlias(getCachedSchema(from), getCachedSchema(to));
return projection::apply;
public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns) {
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), getCachedSchema(to), renamedColumns);
return row -> (InternalRow) unsafeRowWriter.apply(row);

}

protected UnaryOperator<InternalRow> getIdentityProjection() {
return row -> row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,59 @@

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{ClosableIterator, CloseableMappingIterator}
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, CloseableMappingIterator}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StoragePath}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, SparkParquetReader}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}

import scala.collection.mutable

/**
* Implementation of {@link HoodieReaderContext} to read {@link InternalRow}s with
* {@link ParquetFileFormat} on Spark.
* Implementation of [[HoodieReaderContext]] to read [[InternalRow]]s with
* [[ParquetFileFormat]] on Spark.
*
* This uses Spark parquet reader to read parquet data files or parquet log blocks.
*
* @param readermaps our intention is to build the reader inside of getFileRecordIterator, but since it is called from
* the executor, we will need to port a bunch of the code from ParquetFileFormat for each spark version
* for now, we pass in a map of the different readers we expect to create
* @param parquetFileReader A reader that transforms a [[PartitionedFile]] to an iterator of
* [[InternalRow]]. This is required for reading the base file and
* not required for reading a file group with only log files.
* @param recordKeyColumn column name for the recordkey
* @param filters spark filters that might be pushed down into the reader
*/
class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, PartitionedFile => Iterator[InternalRow]]) extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = mutable.Map()

override def getFileRecordIterator(filePath: StoragePath, start: Long, length: Long, dataSchema: Schema, requiredSchema: Schema, storage: HoodieStorage): ClosableIterator[InternalRow] = {
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetReader,
recordKeyColumn: String,
filters: Seq[Filter]) extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
lazy val recordKeyFilters: Seq[Filter] = filters.filter(f => f.references.exists(c => c.equalsIgnoreCase(recordKeyColumn)))
private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = mutable.Map()

override def getFileRecordIterator(filePath: StoragePath,
start: Long,
length: Long,
dataSchema: Schema,
requiredSchema: Schema,
storage: HoodieStorage): ClosableIterator[InternalRow] = {
val structType: StructType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
val structType: StructType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
val projection: UnsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
new CloseableMappingIterator[InternalRow, UnsafeRow](
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
Expand All @@ -73,16 +84,27 @@ class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part
}
}).asInstanceOf[ClosableIterator[InternalRow]]
} else {
val schemaPairHashKey = generateSchemaPairHashKey(dataSchema, requiredSchema)
if (!readerMaps.contains(schemaPairHashKey)) {
throw new IllegalStateException("schemas don't hash to a known reader")
}
new CloseableInternalRowIterator(readerMaps(schemaPairHashKey).apply(fileInfo))
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType)
new CloseableInternalRowIterator(parquetFileReader.read(fileInfo,
readSchema, StructType(Seq.empty), readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
}
}

private def generateSchemaPairHashKey(dataSchema: Schema, requestedSchema: Schema): Long = {
dataSchema.hashCode() + requestedSchema.hashCode()
private def getSchemaAndFiltersForRead(structType: StructType): (StructType, Seq[Filter]) = {
(getHasLogFiles, getNeedsBootstrapMerge, getUseRecordPosition) match {
case (false, false, _) =>
(structType, filters)
case (false, true, true) =>
(getAppliedRequiredSchema(structType), filters)
case (true, _, true) =>
(getAppliedRequiredSchema(structType), recordKeyFilters)
case (_, _, _) =>
(structType, Seq.empty)
}
}

/**
Expand All @@ -101,46 +123,147 @@ class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part
}

override def mergeBootstrapReaders(skeletonFileIterator: ClosableIterator[InternalRow],
dataFileIterator: ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
dataFileIterator.asInstanceOf[ClosableIterator[Any]])
skeletonRequiredSchema: Schema,
dataFileIterator: ClosableIterator[InternalRow],
dataRequiredSchema: Schema): ClosableIterator[InternalRow] = {
doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], skeletonRequiredSchema,
dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
}

protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
new ClosableIterator[Any] {
val combinedRow = new JoinedRow()

override def hasNext: Boolean = {
//If the iterators are out of sync it is probably due to filter pushdown
checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
"Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!")
dataFileIterator.hasNext && skeletonFileIterator.hasNext
private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
skeletonRequiredSchema: Schema,
dataFileIterator: ClosableIterator[Any],
dataRequiredSchema: Schema): ClosableIterator[InternalRow] = {
if (getUseRecordPosition) {
assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME))
assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME))
val javaSet = new java.util.HashSet[String]()
javaSet.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
val skeletonProjection = projectRecord(skeletonRequiredSchema,
AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, javaSet))
//If we have log files, we will want to do position based merging with those as well,
//so leave the row index column at the end
val dataProjection = if (getHasLogFiles) {
getIdentityProjection
} else {
projectRecord(dataRequiredSchema,
AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, javaSet))
}

override def next(): Any = {
(skeletonFileIterator.next(), dataFileIterator.next()) match {
case (s: ColumnarBatch, d: ColumnarBatch) =>
val numCols = s.numCols() + d.numCols()
val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
for (i <- 0 until numCols) {
if (i < s.numCols()) {
vecs(i) = s.column(i)
//Always use internal row for positional merge because
//we need to iterate row by row when merging
new CachingIterator[InternalRow] {
val combinedRow = new JoinedRow()

//position column will always be at the end of the row
private def getPos(row: InternalRow): Long = {
row.getLong(row.numFields-1)
}

private def getNextSkeleton: (InternalRow, Long) = {
val nextSkeletonRow = skeletonFileIterator.next().asInstanceOf[InternalRow]
(nextSkeletonRow, getPos(nextSkeletonRow))
}

private def getNextData: (InternalRow, Long) = {
val nextSkeletonRow = skeletonFileIterator.next().asInstanceOf[InternalRow]
(nextSkeletonRow, getPos(nextSkeletonRow))
}

override def close(): Unit = {
skeletonFileIterator.close()
dataFileIterator.close()
}

override protected def doHasNext(): Boolean = {
if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) {
false
} else {
var nextSkeleton = getNextSkeleton
var nextData = getNextData
while (nextSkeleton._2 != nextData._2) {
if (nextSkeleton._2 > nextData._2) {
if (!dataFileIterator.hasNext) {
return false
} else {
nextData = getNextData
}
} else {
vecs(i) = d.column(i - s.numCols())
if (!skeletonFileIterator.hasNext) {
return false
} else {
nextSkeleton = getNextSkeleton
}
}
}
assert(s.numRows() == d.numRows())
sparkAdapter.makeColumnarBatch(vecs, s.numRows())
case (_: ColumnarBatch, _: InternalRow) => throw new IllegalStateException("InternalRow ColumnVector mismatch")
case (_: InternalRow, _: ColumnarBatch) => throw new IllegalStateException("InternalRow ColumnVector mismatch")
case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
nextRecord = combinedRow(skeletonProjection.apply(nextSkeleton._1), dataProjection.apply(nextData._1))
true
}
}
}
} else {
new ClosableIterator[Any] {
val combinedRow = new JoinedRow()

override def close(): Unit = {
skeletonFileIterator.close()
dataFileIterator.close()
}
}.asInstanceOf[ClosableIterator[InternalRow]]
override def hasNext: Boolean = {
//If the iterators are out of sync it is probably due to filter pushdown
checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
"Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!")
dataFileIterator.hasNext && skeletonFileIterator.hasNext
}

override def next(): Any = {
(skeletonFileIterator.next(), dataFileIterator.next()) match {
case (s: ColumnarBatch, d: ColumnarBatch) =>
//This will not be used until [HUDI-7693] is implemented
val numCols = s.numCols() + d.numCols()
val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
for (i <- 0 until numCols) {
if (i < s.numCols()) {
vecs(i) = s.column(i)
} else {
vecs(i) = d.column(i - s.numCols())
}
}
assert(s.numRows() == d.numRows())
sparkAdapter.makeColumnarBatch(vecs, s.numRows())
case (_: ColumnarBatch, _: InternalRow) => throw new IllegalStateException("InternalRow ColumnVector mismatch")
case (_: InternalRow, _: ColumnarBatch) => throw new IllegalStateException("InternalRow ColumnVector mismatch")
case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
}
}

override def close(): Unit = {
skeletonFileIterator.close()
dataFileIterator.close()
}
}.asInstanceOf[ClosableIterator[InternalRow]]
}
}
}

object SparkFileFormatInternalRowReaderContext {
// From "namedExpressions.scala": Used to construct to record position field metadata.
private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = "__file_source_generated_metadata_col"
private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
private val METADATA_COL_ATTR_KEY = "__metadata_col"

def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: String): Seq[Filter] = {
filters.filter(f => f.references.exists(c => c.equalsIgnoreCase(recordKeyColumn)))
}

def getAppliedRequiredSchema(requiredSchema: StructType): StructType = {
val metadata = new MetadataBuilder()
.putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
.putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
.putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
.build()
val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = false, metadata)
StructType(requiredSchema.fields.filterNot(isIndexTempColumn) :+ rowIndexField)
}

private def isIndexTempColumn(field: StructField): Boolean = {
field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ object HoodieInternalRowUtils {
.getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
}

/**
* due to scala2.11 and HoodieCatalystExpressionUtils is both an object and trait,
* we can't directly call generateUnsafeProjection from java code
*/
def generateUnsafeProjectionAlias(from: StructType, to: StructType): UnsafeProjection = {
generateUnsafeProjection(from, to)
}
/**
* Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
* one [[StructType]] and into another [[StructType]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.hudi.storage.StorageConfiguration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
Expand All @@ -33,12 +34,12 @@ trait SparkParquetReader extends Serializable {
* @param requiredSchema desired output schema of the data
* @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row
* @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters.
* @param sharedConf the hadoop conf
* @param storageConf the hadoop conf
* @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow]
storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow]
}
Loading
Loading