Skip to content

Commit

Permalink
delete partition column from the jobconf if it is written in the file
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Jan 4, 2024
1 parent ffcf47d commit 82f87fa
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,31 +82,32 @@ org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> getRecordRead
private final ArrayWritable arrayWritable;
private final NullWritable nullWritable = NullWritable.get();
private final InputSplit inputSplit;
private final JobConf jobConf;
private final JobConf jobConfCopy;
private final UnaryOperator<ArrayWritable> reverseProjection;

public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
final InputSplit split,
final JobConf jobConf,
final Reporter reporter) throws IOException {
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
Set<String> partitionColumns = new HashSet<>(getPartitionFieldNames(jobConf));
this.jobConfCopy = new JobConf(jobConf);
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
Set<String> partitionColumns = new HashSet<>(getPartitionFieldNames(jobConfCopy));
this.inputSplit = split;
this.jobConf = jobConf;

FileSplit fileSplit = (FileSplit) split;
String tableBasePath = getTableBasePath(split, jobConf);
String tableBasePath = getTableBasePath(split, jobConfCopy);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(jobConf)
.setConf(jobConfCopy)
.setBasePath(tableBasePath)
.build();
String latestCommitTime = getLatestCommitTime(split, metaClient);
Schema tableSchema = getLatestTableSchema(metaClient, jobConf, latestCommitTime);
Schema requestedSchema = createRequestedSchema(tableSchema, jobConf);
Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, latestCommitTime);
Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
Map<String, String[]> hosts = new HashMap<>();
this.readerContext = new HiveHoodieReaderContext(readerCreator, split, jobConf, reporter, tableSchema, hosts, metaClient);
this.readerContext = new HiveHoodieReaderContext(readerCreator, split, jobConfCopy, reporter, tableSchema, hosts, metaClient);
this.arrayWritable = new ArrayWritable(Writable.class, new Writable[requestedSchema.getFields().size()]);
this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, jobConf, tableBasePath,
latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, readerContext.getFs(tableBasePath, jobConf), tableBasePath),
this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, jobConfCopy, tableBasePath,
latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, readerContext.getFs(tableBasePath, jobConfCopy), tableBasePath),
tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), metaClient.getTableConfig(), fileSplit.getStart(),
fileSplit.getLength(), false);
this.fileGroupReader.initRecordIterators();
Expand Down Expand Up @@ -156,7 +157,7 @@ public RealtimeSplit getSplit() {
}

public JobConf getJobConf() {
return jobConf;
return jobConfCopy;
}

private static List<String> getPartitionFieldNames(JobConf jobConf) {
Expand Down Expand Up @@ -262,6 +263,9 @@ private static Schema createRequestedSchema(Schema tableSchema, JobConf jobConf)
}
//if they are actually written to the file, then it is ok to read them from the file
tableSchema.getFields().forEach(f -> partitionColumns.remove(f.name().toLowerCase(Locale.ROOT)));
//need to filter those partitions because they will be added back later on. And we don't want that
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
Arrays.stream(jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS).split(",")).filter(partitionColumns::contains).collect(Collectors.joining(",")));
return HoodieAvroUtils.generateProjectionSchema(tableSchema,
Arrays.stream(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",")).filter(c -> !partitionColumns.contains(c)).collect(Collectors.toList()));
}
Expand Down

0 comments on commit 82f87fa

Please sign in to comment.