Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6787] Integrate the new file group reader with Hive query engine #10422

Merged
merged 43 commits into from
Jun 9, 2024

Conversation

jonvex
Copy link
Contributor

@jonvex jonvex commented Dec 28, 2023

Change Logs

Replace existing hive read logic with filegroup reader

HoodieFileGroupReader is the generic implementation of a filegroup reader that is intended to be used by all engines. I created HoodieFileGroupReaderRecordReader which implements RecordReader. HoodieFileGroupReaderRecordReader uses HoodieFileGroupReader with HiveHoodieReaderContext to read filegroups (cow, mor, bootstrap) with the hive/hadoop engine.

Impact

hive will be more maintainable

Risk level (write none, low medium or high below)

high
need to do lots of testing

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@jonvex
Copy link
Contributor Author

jonvex commented Jan 4, 2024

@vinothchandar

@vinothchandar vinothchandar self-assigned this Jan 5, 2024
@vinothchandar vinothchandar changed the title [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive Jan 8, 2024
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very promising.

packaging/bundle-validation/validate.sh Outdated Show resolved Hide resolved
@@ -116,6 +117,7 @@ public void setUp() {
hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
baseJobConf = new JobConf(hadoopConf);
baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024));
baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "false"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is directly creating the record reader instead of reading using the file format

HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);

Looking back at this, I think I might be able to update this test to use the new fg reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried again and still couldn't get them working.

Option<ClosableIterator<T>> skeletonFileIterator = requiredFields.getLeft().isEmpty() ? Option.empty() :
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0, baseFile.getFileLen(),
createSchemaFromFields(allFields.getLeft()), createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
Option<Pair<ClosableIterator<T>,Schema>> dataFileIterator =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its cool we are able to add a new engine without much changes to this class.

if (!(split instanceof FileSplit) || !checkTableIsHudi(split, job)) {
return super.getRecordReader(split, job, reporter);
}
if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self; dig into these.

}

public static List<String> getPartitionFieldNames(JobConf jobConf) {
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xicm Can you help confirm this part?


@Override
public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new UnsupportedOperationException("Not supported for HoodieHiveRecord");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't needed for reading so I didn't implement it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we implement getRecordKey. MIght be useful later. I guess we only need to call getValue() for record key field, isn't it?


@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) {
throw new UnsupportedOperationException("Not supported for HoodieHiveRecord");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't needed for reading so I didn't implement it.

public class HoodieHiveRecordMerger implements HoodieRecordMerger {
@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecord.HoodieRecordType.HIVE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need to override the merge logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied what HoodieSparkRecordMerger does

TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));

StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
ArrayWritableObjectInspector objectInspector = new ArrayWritableObjectInspector(rowTypeInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xicm @xiarixiaoyao can you help confirm the correctness?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be compatibility issues between hive2 and hive3. DATE, TIMESTAMP

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be compatibility issues between hive2 and hive3. DATE, TIMESTAMP

I think hive will handle this itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this is pretty much a copy of

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I test with hive3, this works well.

@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {

if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) {
try {
Copy link
Contributor

@xicm xicm Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to confirm that the values of "hive.io.file.readcolumn.names" and "hive.io.file.readcolumn.ids" in Jobconf contain partition fields, if not, hive3 partition query returns null. see #7355

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static Schema createRequestedSchema(Schema tableSchema, JobConf jobConf) {
I remove the partition fields from the read columns if the parquet file doesn't contain them. Does that help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem is parquet file in hive doesn't hava partition fields, while hudi parquet files have.

}
}, split, job, reporter);
} else {
return new HoodieFileGroupReaderRecordReader(super::getRecordReader, split, job, reporter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I test with hive3.1.2, partition query returns empty result with this reader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move

new SchemaEvolutionContext(split, job).doEvolutionForParquetFormat();
if (LOG.isDebugEnabled()) {
LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
}
HoodieRealtimeInputFormatUtils.addProjectionField(job, job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
to the top of this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added your suggestion. Could you please let me know if that fixes the issue? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

partitionColumns = Arrays.stream(partitionColString.split(",")).collect(Collectors.toSet());
}
//if they are actually written to the file, then it is ok to read them from the file
tableSchema.getFields().forEach(f -> partitionColumns.remove(f.name().toLowerCase(Locale.ROOT)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, will partitionColumns always be empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionColumns won't be empty if any of the partition columns are not written to the file. tableSchema only has the columns that are written to the file. This is the case in the docker demo https://hudi.apache.org/docs/docker_demo#step-3-sync-with-hive . If you look at the data in https://github.com/apache/hudi/tree/master/docker/demo/data, there is no field named "dt".

@danny0405
Copy link
Contributor

@bvaradar Can you help the review of the hive related code?

@bvaradar
Copy link
Contributor

bvaradar commented Feb 7, 2024

@bvaradar Can you help the review of the hive related code?

Yes @danny0405 . Will review this PR.

@jonvex
Copy link
Contributor Author

jonvex commented Feb 7, 2024

Thanks @bvaradar, we all appreciate it!

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. @jonvex : What Hive versions are we targeting/testing ?

}

@Override
public String getRecordKey(ArrayWritable record, Schema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this method already defined in the same way in the base class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the base class it just uses the meta col while here we use the actual field if meta cols are disabled. TBH maybe that should be the case for the base class?

@jonvex
Copy link
Contributor Author

jonvex commented Feb 20, 2024

Overall looks good to me. @jonvex : What Hive versions are we targeting/testing ?

@bvaradar I used the docker demo to test. I think that is using Hive 2. We would like this to replace the existing implementation so the goal is to support everything that works when fg reader is disabled.

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Feb 26, 2024
@codope codope changed the title [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [HUDI-6787] Implement the HoodieFileGroupReader API for Hive May 31, 2024
@hudi-bot
Copy link

hudi-bot commented Jun 9, 2024

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua merged commit 0abc00d into apache:master Jun 9, 2024
46 checks passed
@yihua yihua changed the title [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [HUDI-6787] Integrate the new file group reader with Hive query engine Jun 23, 2024
@yihua yihua mentioned this pull request Jun 23, 2024
4 tasks
@yihua
Copy link
Contributor

yihua commented Jun 23, 2024

@jonvex could you check in the code of testing new file group reader on Hive 3 from #11398?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-1.0.0-beta2 release-1.0.0 size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants