Skip to content

Commit

Permalink
testing update
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Dec 28, 2023
1 parent b2c65be commit 99517e2
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ services:
- ./hadoop.env
environment:
SERVICE_PRECONDITION: "hivemetastore:9083"
JAVA_TOOL_OPTIONS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
ports:
- "10000:10000"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
- "5005:64757"
depends_on:
- "hivemetastore"
links:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,6 @@ public abstract ClosableIterator<T> getFileRecordIterator(
*/
public String getRecordKey(T record, Schema schema) {
Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD);
if (val == null) {
System.out.println("******START RECORD*****");
for (Schema.Field f : schema.getFields()) {
Object o = getValue(record, schema, f.name());
System.out.println("Field: " + f.name() + ", Value: " + o);
}
System.out.println("******END RECORD*****");
}
return val.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void close() {
protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T record,
Map<String, Object> metadata,
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws IOException {
System.out.println("Processing next data record in record buffer");
if (existingRecordMetadataPair != null) {
System.out.println("existingRecordMetadataPair is not null");
// Merge and store the combined record
// Note that the incoming `record` is from an older commit, so it should be put as
// the `older` in the merge API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ protected void transitionPendingState(HoodieInstant fromInstant, HoodieInstant t
}
} else {
// Ensures old state exists in timeline
LOG.info("Checking for file exists ?" + getInstantFileNamePath(fromInstantFileName));
ValidationUtils.checkArgument(metaClient.getFs().exists(getInstantFileNamePath(fromInstantFileName)));
ValidationUtils.checkArgument(metaClient.getFs().exists(getInstantFileNamePath(fromInstantFileName)),
"File " + getInstantFileNamePath(fromInstantFileName) + " does not exist!");
// Use Write Once to create Target File
if (allowRedundantTransitions) {
FileIOUtils.createFileInPath(metaClient.getFs(), getInstantFileNamePath(toInstantFileName), data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void syncHoodieTable() {

private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) {
if (bqSyncClient.tableExists(tableName)) {
LOG.info(tableName + " already exists");
LOG.info(tableName + " already exists. Skip table creation.");
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -182,16 +183,19 @@ public void updateTableSchema(String tableName, Schema schema, List<String> part
Table existingTable = bigquery.getTable(TableId.of(projectId, datasetName, tableName));
ExternalTableDefinition definition = existingTable.getDefinition();
Schema remoteTableSchema = definition.getSchema();
// Add the partition fields into the schema to avoid conflicts while updating
List<Field> updatedTableFields = remoteTableSchema.getFields().stream()
List<Field> finalTableFields = new ArrayList<>(schema.getFields());
// Add the partition fields into the schema to avoid conflicts while updating. And ensure the partition fields are at the end to
// avoid unnecessary updates.
List<Field> bqPartitionFields = remoteTableSchema.getFields().stream()
.filter(field -> partitionFields.contains(field.getName()))
.collect(Collectors.toList());
updatedTableFields.addAll(schema.getFields());
Schema finalSchema = Schema.of(updatedTableFields);
finalTableFields.addAll(bqPartitionFields);
Schema finalSchema = Schema.of(finalTableFields);
boolean sameSchema = definition.getSchema() != null && definition.getSchema().equals(finalSchema);
boolean samePartitionFilter = partitionFields.isEmpty()
|| (requirePartitionFilter == (definition.getHivePartitioningOptions().getRequirePartitionFilter() != null && definition.getHivePartitioningOptions().getRequirePartitionFilter()));
if (sameSchema && samePartitionFilter) {
LOG.info("No table update is needed.");
return; // No need to update schema.
}
ExternalTableDefinition.Builder builder = definition.toBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,34 @@
import org.apache.hudi.sync.common.HoodieSyncConfig;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.HivePartitioningOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

public class TestHoodieBigQuerySyncClient {
private static final String PROJECT_ID = "test_project";
Expand Down Expand Up @@ -125,4 +133,31 @@ void createTableWithManifestFile_nonPartitioned() throws Exception {
String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", "
+ "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", PROJECT_ID, TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI));
}

@Test
void skipUpdatingSchema_partitioned() throws Exception {
BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
client = new HoodieBigQuerySyncClient(config, mockBigQuery);
Table mockTable = mock(Table.class);
ExternalTableDefinition mockTableDefinition = mock(ExternalTableDefinition.class);
// The table schema has no change: it contains a "field" and a "partition_field".
Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
List<String> partitionFields = new ArrayList<String>();
partitionFields.add("partition_field");
List<Field> bqFields = new ArrayList<Field>();
// The "partition_field" always follows "field".
bqFields.add(Field.of("field", StandardSQLTypeName.STRING));
bqFields.add(Field.of("partition_field", StandardSQLTypeName.STRING));
Schema bqSchema = Schema.of(bqFields);
HivePartitioningOptions hivePartitioningOptions = HivePartitioningOptions.newBuilder().setRequirePartitionFilter(true).build();

when(mockBigQuery.getTable(any())).thenReturn(mockTable);
when(mockTable.getDefinition()).thenReturn(mockTableDefinition);
when(mockTableDefinition.getSchema()).thenReturn(bqSchema);
when(mockTableDefinition.getHivePartitioningOptions()).thenReturn(hivePartitioningOptions);

client.updateTableSchema(TEST_TABLE, schema, partitionFields);
// Expect no update.
verify(mockBigQuery, never()).update(mockTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
Expand Down Expand Up @@ -57,7 +58,6 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;

public class HiveHoodieReaderContext extends HoodieReaderContext<ArrayWritable> {
Expand Down Expand Up @@ -107,37 +107,28 @@ public ClosableIterator<ArrayWritable> getFileRecordIterator(Path filePath, long
if (firstRecordReader == null) {
firstRecordReader = recordReader;
}
return new RecordReaderValueIterator<>(recordReader);
ClosableIterator<ArrayWritable> recordIterator = new RecordReaderValueIterator<>(recordReader);
if (dataSchema.equals(requiredSchema)) {
return recordIterator;
}
return new CloseableMappingIterator<>(recordIterator, projectRecord(dataSchema, requiredSchema));
}

private void setSchemas(JobConf jobConf, Schema dataSchema, Schema requiredSchema) {
System.out.println("Setting confs for dataSchema: " + dataSchema + ", requiredSchema: " + requiredSchema);
System.out.println("Before: ");
System.out.println("serdeConstants.LIST_COLUMNS: " + jobConf.get(serdeConstants.LIST_COLUMNS));
System.out.println("serdeConstants.LIST_COLUMN_TYPES: " + jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
System.out.println("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR: " + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
System.out.println("ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR: " + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
List<String> dataColumnNameList = dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
List<TypeInfo> dataColumnTypeList = dataColumnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", dataColumnNameList));
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(",")));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, requiredSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.joining(",")));
String readColNames = requiredSchema.getFields().stream().map(f -> f.name()).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNames);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, requiredSchema.getFields()
.stream().map(f -> String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(",")));
System.out.println("After: ");
System.out.println("serdeConstants.LIST_COLUMNS: " + jobConf.get(serdeConstants.LIST_COLUMNS));
System.out.println("serdeConstants.LIST_COLUMN_TYPES: " + jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
System.out.println("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR: " + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
System.out.println("ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR: " + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
}

@Override
public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) {
//should be support timestamp?
ArrayWritable convertedRecord = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroRecord.getSchema(), false);
System.out.println("converted record schema is " + avroRecord.getSchema().toString());
System.out.println("converted record key is" + getRecordKey(convertedRecord, avroRecord.getSchema()));
System.out.println("converted record commit time is" + getValue(convertedRecord, avroRecord.getSchema(), COMMIT_TIME_METADATA_FIELD));
return convertedRecord;
}

Expand Down Expand Up @@ -209,6 +200,10 @@ public UnaryOperator<ArrayWritable> projectRecord(Schema from, Schema to) {
return HoodieArrayWritableAvroUtils.projectRecord(from, to);
}

public UnaryOperator<ArrayWritable> reverseProjectRecord(Schema from, Schema to) {
return HoodieArrayWritableAvroUtils.reverseProject(from, to);
}

public long getPos() throws IOException {
if (firstRecordReader != null) {
return firstRecordReader.getPos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -49,11 +51,14 @@
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

public class HoodieFileGroupReaderRecordReader implements RecordReader<NullWritable, ArrayWritable> {
Expand All @@ -75,10 +80,13 @@ org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> getRecordRead
private final InputSplit inputSplit;
private final JobConf jobConf;

private final UnaryOperator<ArrayWritable> reverseProjection;

public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
final InputSplit split,
final JobConf jobConf,
final Reporter reporter) throws IOException {
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
this.inputSplit = split;
this.jobConf = jobConf;
FileSplit fileSplit = (FileSplit) split;
Expand All @@ -87,7 +95,7 @@ public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
.setConf(jobConf)
.setBasePath(tableBasePath)
.build();
Schema tableSchema = getLatestTableSchema(metaClient);
Schema tableSchema = getLatestTableSchema(metaClient, jobConf);
Schema requestedSchema = createRequestedSchema(tableSchema, jobConf);
Map<String, String[]> hosts = new HashMap<>();
this.readerContext = new HiveHoodieReaderContext(readerCreator, split, jobConf, reporter, tableSchema, hosts, metaClient);
Expand All @@ -97,6 +105,7 @@ public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), metaClient.getTableConfig(), fileSplit.getStart(),
fileSplit.getLength(), false);
this.fileGroupReader.initRecordIterators();
this.reverseProjection = readerContext.reverseProjectRecord(requestedSchema, tableSchema);
}

@Override
Expand All @@ -105,6 +114,7 @@ public boolean next(NullWritable key, ArrayWritable value) throws IOException {
return false;
}
value.set(fileGroupReader.next().get());
reverseProjection.apply(value);
return true;
}

Expand Down Expand Up @@ -141,12 +151,17 @@ public JobConf getJobConf() {
return jobConf;
}

private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient) {
private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, JobConf jobConf) {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
Option<Schema> schemaOpt = tableSchemaResolver.getTableAvroSchemaFromLatestCommit(true);
if (schemaOpt.isPresent()) {
return schemaOpt.get();
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
return HoodieRealtimeRecordReaderUtils.addPartitionFields(schemaOpt.get(), partitioningFields);
}
throw new RuntimeException("Unable to get table schema");
} catch (Exception e) {
Expand All @@ -168,28 +183,18 @@ private static String getTableBasePath(InputSplit split, JobConf jobConf) throws

private static String getLatestCommitTime(InputSplit split, HoodieTableMetaClient metaClient) {
if (split instanceof RealtimeSplit) {
System.out.println("172 max commit time: " + ((RealtimeSplit) split).getMaxCommitTime());
Option<HoodieInstant> lastInstant = metaClient.getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
System.out.println("last instant present: " + lastInstant.get().getTimestamp());
System.out.println("completion time: " + lastInstant.get().getCompletionTime());
}
return ((RealtimeSplit) split).getMaxCommitTime();
}
Option<HoodieInstant> lastInstant = metaClient.getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
System.out.println("last instant present: " + lastInstant.get().getTimestamp());
System.out.println("completion time: " + lastInstant.get().getCompletionTime());
return lastInstant.get().getTimestamp();
} else {
return "";
}
}

private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
System.out.println("inside getFileSliceFromSplit");
if (split instanceof RealtimeSplit) {
System.out.println("Is realtime split");
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
HoodieFileGroupId fileGroupId = new HoodieFileGroupId(FSUtils.getFileId(realtimeSplit.getPath().getName()),
FSUtils.getPartitionPath(realtimeSplit.getBasePath(), realtimeSplit.getPath().getParent().toString()).toString());
Expand All @@ -203,7 +208,6 @@ private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, Stri
}
hosts.put(realtimeSplit.getPath().toString(), realtimeSplit.getLocations());
HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fs.getFileStatus(realtimeSplit.getPath()), bootstrapBaseFile);
System.out.println("number of log files: " + realtimeSplit.getDeltaLogFiles().size());
return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, realtimeSplit.getDeltaLogFiles());
}
//just regular cow
Expand All @@ -215,7 +219,6 @@ private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, Stri

private static Schema createRequestedSchema(Schema tableSchema, JobConf jobConf) {
String partitionColString = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
System.out.println("Read columns are " + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
Set<String> partitionColumns;
if (partitionColString == null) {
partitionColumns = Collections.EMPTY_SET;
Expand Down
Loading

0 comments on commit 99517e2

Please sign in to comment.